OpenINTEL.nl -- PySpark notebook
Author: Mattijs Jonker <m.jonker[at]utwente.nl>
License of this notebook: CC BY-SA 4.0
Example code to load public OpenINTEL forward DNS (fDNS) data, using the AWS SDK or PySpark.
For use of our data, read our terms of use here
Imports¶
In [1]:
import os
import datetime
import fnmatch
import tempfile
import time
import dateutil
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as psf
import boto3
import botocore
import pandas as pd
Global Configuration¶
In [2]:
# The OpenINTEL repository endpoint (S3-capable)
OI_ENDPOINT = "https://object.openintel.nl"
# The forward DNS (fDNS) bucket, measurement data bases
OI_BUCKET_NAME = "openintel-public"
OI_FDNS_BASE = "fdns"
OI_FDNS_ZONEBASED = os.path.join(OI_FDNS_BASE, "basis=zonefile") # zonefile-based fDNS measurement data
OI_FDNS_LISTBASED = os.path.join(OI_FDNS_BASE, "basis=toplist") # toplist-based fDNS measurement data
OI_OBJECT_SUFFIX = "gz.parquet"
Approach 1: Read OpenINTEL data (public) with the Amazon Web Services (AWS) SDK for Python (boto3)¶
Define boto3 Resource and Client¶
In [3]:
# Get a boto Resource
S3R_OPENINTEL = boto3.resource(
"s3",
"nl-utwente",
endpoint_url = OI_ENDPOINT,
# We're using anonymous access
config = botocore.config.Config(
signature_version = botocore.UNSIGNED,
)
)
# Get its client, for lower-level actions, if needed
S3C_OPENINTEL = S3R_OPENINTEL.meta.client
# Prevent some request going to AWS instead of our server
S3C_OPENINTEL.meta.events.unregister('before-sign.s3', botocore.utils.fix_s3_host)
# The OpenINTEL data bucket
OI_BUCKET = S3R_OPENINTEL.Bucket(OI_BUCKET_NAME)
Configure data of interest¶
In [4]:
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(dateutil.rrule.WEEKLY, dtstart=datetime.datetime(2024, 10, 7), until=datetime.datetime(2024, 10, 28)))
# The public source(s) to download (see index: https://openintel.nl/download/forward-dns/)
DO_SOURCES = {
OI_FDNS_ZONEBASED : [
"li",
"gov"
],
OI_FDNS_LISTBASED : [
#"tranco"
]
}
Read data into a Pandas DataFrame¶
note: this code is for demonstrative purposes only and will not scale. For a multi-source and longitudinal analysis, you are recommended to separately download the files (not in the example Docker container's filesystem) and devise an approach to process the data, for example by running your analysis and aggregate data point creation over each (source, date) separately (if possible) and then combining the results into a time series.
In [5]:
all_data = [] # list of dicts with data from individual objects
# Iter dates
for i_date in DO_DATES:
# Iter source bases
for i_source_base in DO_SOURCES.keys():
# Iter base's sources
if len(DO_SOURCES[i_source_base]) > 0:
for i_source in DO_SOURCES[i_source_base]:
# Build a partition path
i3_data_partifion_prefix = os.path.join(i_source_base, "source={}".format(i_source), "year={}".format(i_date.year), "month={:02d}".format(i_date.month), "day={:02d}".format(i_date.day))
# List objects under partition path
i3_s3_lo = S3C_OPENINTEL.list_objects_v2(Bucket=OI_BUCKET.name, Prefix=i3_data_partifion_prefix + "/", Delimiter="/")
# Are there objects?
if "Contents" in i3_s3_lo:
# Iterate objects
for i_content in i3_s3_lo["Contents"]:
# Load data at given key
if fnmatch.fnmatch(i_content["Key"], "*." + OI_OBJECT_SUFFIX):
# Open a temporary file to download the object into
with tempfile.NamedTemporaryFile(mode="w+b", prefix="{}.".format(i_date.date().isoformat()), suffix="." + OI_OBJECT_SUFFIX, delete=True) as tempFile:
print("Opened temporary file for object download: '{}'.".format(tempFile.name))
# Download file object
OI_BUCKET.download_fileobj(Key=i_content["Key"], Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 16*1024*1024))
print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(os.path.join(OI_BUCKET.name, i_content["Key"]), os.path.getsize(tempFile.name) / (1024*1024), tempFile.name))
## Use Pandas to read file into a DF and append to list
# n.b.: this isn't exactly efficient
i_obj_pdf = pd.read_parquet(
path = tempFile.name,
# We read only the columns we are interested in (see: https://openintel.nl/data/dictionary/)
columns = ["query_name", "response_type"]
)
# Add partition columns (these columns are not contained in the files themselves)
i_obj_pdf["source"] = i_source
i_obj_pdf["year"] = i_date.year
i_obj_pdf["month"] = i_date.month
i_obj_pdf["day"] = i_date.day
# Extend list of data records
all_data.extend(
# We use a list of dictionaries for performance
i_obj_pdf.to_dict(orient='records')
)
del i_obj_pdf
# Create Pandas DF using data records from objects
pandas_df = pd.DataFrame.from_records(all_data)
print("done: read {} records".format(len(pandas_df)))
Opened temporary file for object download: '/tmp/2024-10-07.8k0_7dce.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=07/part-00006-8108fb65-6cc1-4f73-afa3-c8a628f32ccd.c000.gz.parquet' [42.34MiB] into '/tmp/2024-10-07.8k0_7dce.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-07.i11ad2j8.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=07/part-00006-ee198b1e-28db-4aac-b1bf-f3c8eb317fed.c000.gz.parquet' [7.65MiB] into '/tmp/2024-10-07.i11ad2j8.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-07.ygm57xmh.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=07/part-00079-e6af13d6-a0f9-4d3b-8e7b-fa5c38cb5daf.c000.gz.parquet' [7.59MiB] into '/tmp/2024-10-07.ygm57xmh.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-14.u5l__4_e.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=14/part-00012-329ee1d9-0097-41c3-9cc8-a655f8df6ff9.c000.gz.parquet' [42.43MiB] into '/tmp/2024-10-14.u5l__4_e.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-14.ght6zqtm.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=14/part-00000-098d9968-58f1-4fea-ad3c-ba97515cba8c.c000.gz.parquet' [7.57MiB] into '/tmp/2024-10-14.ght6zqtm.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-14.dheqnsfy.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=14/part-00077-9fab0adc-2449-45cb-97e6-6c1b72afc2a7.c000.gz.parquet' [7.57MiB] into '/tmp/2024-10-14.dheqnsfy.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-21.hhxav3jm.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=21/part-00006-5db8a29e-1e0f-4871-8271-b6bf32260b3d.c000.gz.parquet' [42.40MiB] into '/tmp/2024-10-21.hhxav3jm.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-21.qigyp12s.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=21/part-00002-20075ec6-e3e9-4921-a837-c75519fd15e7.c000.gz.parquet' [7.56MiB] into '/tmp/2024-10-21.qigyp12s.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-21.0tg0on3a.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=21/part-00020-ddb48036-b9c3-43dd-9f23-a879cd81c791.c000.gz.parquet' [7.56MiB] into '/tmp/2024-10-21.0tg0on3a.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-28.sq6txvur.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=28/part-00009-99b37179-bfb8-42ce-b54e-2e35b9d1146b.c000.gz.parquet' [42.43MiB] into '/tmp/2024-10-28.sq6txvur.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-28.p6dp9mi1.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=28/part-00003-6af422bf-61ae-44d9-a1f5-560ab39a8744.c000.gz.parquet' [7.72MiB] into '/tmp/2024-10-28.p6dp9mi1.gz.parquet'. Opened temporary file for object download: '/tmp/2024-10-28.6y2nxagi.gz.parquet'. Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=28/part-00057-14e5aea2-df32-45ce-a3dd-2524ce04e479.c000.gz.parquet' [7.71MiB] into '/tmp/2024-10-28.6y2nxagi.gz.parquet'. done: read 5557482 records
Simple example aggregate¶
Number of registered domains per (source, date) partition
In [6]:
# Count number of registered domains per source per day
pandas_df.query(
# We know that the data only contains SOA response_type for the @ (i.e., registered domain)
"response_type == 'SOA'"
# Group by (source, date) partition
).groupby(
["source", "year", "month", "day"]
# Count the number of names for the aggregate key
).agg(
domain_count = ('query_name', 'count')
)
Out[6]:
domain_count | ||||
---|---|---|---|---|
source | year | month | day | |
gov | 2024 | 10 | 7 | 25763 |
14 | 25917 | |||
21 | 26079 | |||
28 | 26209 | |||
li | 2024 | 10 | 7 | 85159 |
14 | 85289 | |||
21 | 85287 | |||
28 | 85259 |
In [ ]:
Approach 2: Read OpenINTEL data (public) with Apache Spark (PySpark)¶
Spark Configuration¶
In [7]:
# Create Spark Config
sparkConf = SparkConf()
sparkConf.setMaster("local[1]")
sparkConf.setAppName("pyspark-{}-{}".format(os.getuid(), int(time.time())))
# executors
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "4G")
sparkConf.set("spark.executor.memoryOverhead", "512M")
# driver
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.memory", "2G")
# RIR-data.org Object Storage settings
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("fs.s3a.endpoint", OI_ENDPOINT)
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.signing-algorithm", "S3SignerType")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.block.size", "16M")
sparkConf.set("fs.s3a.readahead.range", "1M")
sparkConf.set("fs.s3a.experimental.input.fadvise", "normal")
sparkConf.set("io.file.buffer.size", "67108864")
sparkConf.set("spark.buffer.size", "67108864")
# Parquet I/O performance settings for cloud integration
sparkConf.set("spark.hadoop.parquet.summary.metadata.level", "NONE")
sparkConf.set("spark.sql.parquet.mergeSchema", "false")
sparkConf.set("spark.sql.parquet.filterPushdown", "true")
sparkConf.set("spark.sql.hive.metastorePartitionPruning", "true")
print("SparkConf created")
SparkConf created
Create SparkContext¶
In [8]:
# Initialize our Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
print("Started SparkContext")
Started SparkContext
Configure data of interest¶
In [9]:
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(dateutil.rrule.WEEKLY, dtstart=datetime.datetime(2024, 10, 7), until=datetime.datetime(2024, 10, 28)))
# The public source(s) to download (see index: https://openintel.nl/download/forward-dns/)
DO_SOURCES = {
OI_FDNS_ZONEBASED : [
"li",
"gov"
],
OI_FDNS_LISTBASED : [
#"tranco"
]
}
Define Spark DataFrame¶
In [10]:
my_oi_fdns_df = spark.read.option("basePath", "s3a://{}/{}/".format(OI_BUCKET_NAME, OI_FDNS_BASE)).parquet(
*[
# All the partition paths for the provided sources and dates
os.path.join("s3a://{}/{}".format(OI_BUCKET.name, i_fdns_source_base), "source={}".format(i_source), "year={}".format(i_date.year), "month={:02d}".format(i_date.month), "day={:02d}".format(i_date.day))
for i_fdns_source_base in DO_SOURCES.keys()
for i_source in DO_SOURCES[i_fdns_source_base]
for i_date in DO_DATES
]
)
Simple example aggregate (similar to above)¶
In [12]:
my_oi_fdns_df.filter(
# We know that the data only contains SOA response_type for the @ (i.e., registered domain)
psf.col("response_type") == "SOA"
).groupby(
# Group by (source, date) partition
["source", "year", "month", "day"]
).agg(
# Count the number of names for the aggregate key
psf.count("query_name").alias("domain_count")
).orderBy("source", "year", "month", "day").show(10)
+------+----+-----+---+------------+ |source|year|month|day|domain_count| +------+----+-----+---+------------+ | gov|2024| 10| 7| 25763| | gov|2024| 10| 14| 25917| | gov|2024| 10| 21| 26079| | gov|2024| 10| 28| 26209| | li|2024| 10| 7| 85159| | li|2024| 10| 14| 85289| | li|2024| 10| 21| 85287| | li|2024| 10| 28| 85259| +------+----+-----+---+------------+
Stop SparkContext¶
In [13]:
sc.stop()
print("Stopped SparkContext")
Stopped SparkContext