OpenINTEL.nl -- PySpark notebook

Author: Mattijs Jonker <m.jonker[at]utwente.nl>

License of this notebook: CC BY-SA 4.0

Example code to load public OpenINTEL forward DNS (fDNS) data, using the AWS SDK or PySpark.

For use of our data, read our terms of use here

CHANGELOG:

  • v1.0.1 -- Increased multipart chunksize to reduce the risk of triggering requests rate-limiting
  • v1.0.0 -- Initial version of notebook

Imports¶

In [1]:
import os
import datetime
import fnmatch
import tempfile
import time
import dateutil

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as psf

import boto3
import botocore
import pandas as pd

Global Configuration¶

In [2]:
# The OpenINTEL repository endpoint (S3-capable)
OI_ENDPOINT = "https://object.openintel.nl"

# The forward DNS (fDNS) bucket, measurement data bases 
OI_BUCKET_NAME = "openintel-public"
OI_FDNS_BASE = "fdns"
OI_FDNS_ZONEBASED = os.path.join(OI_FDNS_BASE, "basis=zonefile") # zonefile-based fDNS measurement data
OI_FDNS_LISTBASED = os.path.join(OI_FDNS_BASE, "basis=toplist")  # toplist-based fDNS measurement data
OI_OBJECT_SUFFIX = "gz.parquet"

Approach 1: Read OpenINTEL data (public) with the Amazon Web Services (AWS) SDK for Python (boto3)¶

Define boto3 Resource and Client¶

In [3]:
# Get a boto Resource
S3R_OPENINTEL = boto3.resource(
    "s3",
    "nl-utwente",
    endpoint_url          = OI_ENDPOINT,
    # We're using anonymous access
    config = botocore.config.Config(
        signature_version = botocore.UNSIGNED,
    )
)

# Get its client, for lower-level actions, if needed
S3C_OPENINTEL = S3R_OPENINTEL.meta.client

# Prevent some request going to AWS instead of our server
S3C_OPENINTEL.meta.events.unregister('before-sign.s3', botocore.utils.fix_s3_host)

# The OpenINTEL data bucket
OI_BUCKET = S3R_OPENINTEL.Bucket(OI_BUCKET_NAME)

Configure data of interest¶

In [4]:
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(dateutil.rrule.WEEKLY, dtstart=datetime.datetime(2024, 10, 7), until=datetime.datetime(2024, 10, 28)))

# The public source(s) to download (see index: https://openintel.nl/download/forward-dns/)
DO_SOURCES = {
    OI_FDNS_ZONEBASED : [
        "li",
        "gov"
    ],
    OI_FDNS_LISTBASED : [
        #"tranco"
    ]   
}

Read data into a Pandas DataFrame¶

note: this code is for demonstrative purposes only and will not scale. For a multi-source and longitudinal analysis, you are recommended to separately download the files (not in the example Docker container's filesystem) and devise an approach to process the data, for example by running your analysis and aggregate data point creation over each (source, date) separately (if possible) and then combining the results into a time series.

In [5]:
all_data = [] # list of dicts with data from individual objects
# Iter dates
for i_date in DO_DATES:

    # Iter source bases
    for i_source_base in DO_SOURCES.keys():

        # Iter base's sources
        if len(DO_SOURCES[i_source_base]) > 0:
            for i_source in DO_SOURCES[i_source_base]:
    
                # Build a partition path
                i3_data_partifion_prefix = os.path.join(i_source_base, "source={}".format(i_source), "year={}".format(i_date.year), "month={:02d}".format(i_date.month), "day={:02d}".format(i_date.day))

                # List objects under partition path
                i3_s3_lo = S3C_OPENINTEL.list_objects_v2(Bucket=OI_BUCKET.name, Prefix=i3_data_partifion_prefix + "/", Delimiter="/")

                # Are there objects?
                if "Contents" in i3_s3_lo:

                    # Iterate objects
                    for i_content in i3_s3_lo["Contents"]:
                        
                        # Load data at given key
                        if fnmatch.fnmatch(i_content["Key"], "*." + OI_OBJECT_SUFFIX):

                            # Open a temporary file to download the object into
                            with tempfile.NamedTemporaryFile(mode="w+b", prefix="{}.".format(i_date.date().isoformat()), suffix="." + OI_OBJECT_SUFFIX, delete=True) as tempFile:
                        
                                print("Opened temporary file for object download: '{}'.".format(tempFile.name))
                                
                                # Download file object
                                OI_BUCKET.download_fileobj(
                                    Key = i_content["Key"],
                                    Fileobj = tempFile,
                                    # Please note that a small chunksize may trigger the request rate limiter
                                    Config = boto3.s3.transfer.TransferConfig(multipart_chunksize = 64*1024*1024)
                                )
                                print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(os.path.join(OI_BUCKET.name, i_content["Key"]), os.path.getsize(tempFile.name) / (1024*1024), tempFile.name))
                                
                                ## Use Pandas to read file into a DF and append to list
                                # n.b.: this isn't exactly efficient
                                i_obj_pdf = pd.read_parquet(
                                    path = tempFile.name,
                                    # We read only the columns we are interested in (see: https://openintel.nl/background/dictionary/)
                                    columns = ["query_name", "response_type"]
                                )
                                # Add partition columns (these columns are not contained in the files themselves)
                                i_obj_pdf["source"] = i_source
                                i_obj_pdf["year"] = i_date.year
                                i_obj_pdf["month"] = i_date.month
                                i_obj_pdf["day"] = i_date.day

                                # Extend list of data records
                                all_data.extend(
                                    # We use a list of dictionaries for performance
                                    i_obj_pdf.to_dict(orient='records')
                                )
                                del i_obj_pdf


# Create Pandas DF using data records from objects
pandas_df = pd.DataFrame.from_records(all_data)
print("done: read {} records".format(len(pandas_df)))
Opened temporary file for object download: '/tmp/2024-10-07.8k0_7dce.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=07/part-00006-8108fb65-6cc1-4f73-afa3-c8a628f32ccd.c000.gz.parquet' [42.34MiB] into '/tmp/2024-10-07.8k0_7dce.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-07.i11ad2j8.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=07/part-00006-ee198b1e-28db-4aac-b1bf-f3c8eb317fed.c000.gz.parquet' [7.65MiB] into '/tmp/2024-10-07.i11ad2j8.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-07.ygm57xmh.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=07/part-00079-e6af13d6-a0f9-4d3b-8e7b-fa5c38cb5daf.c000.gz.parquet' [7.59MiB] into '/tmp/2024-10-07.ygm57xmh.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-14.u5l__4_e.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=14/part-00012-329ee1d9-0097-41c3-9cc8-a655f8df6ff9.c000.gz.parquet' [42.43MiB] into '/tmp/2024-10-14.u5l__4_e.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-14.ght6zqtm.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=14/part-00000-098d9968-58f1-4fea-ad3c-ba97515cba8c.c000.gz.parquet' [7.57MiB] into '/tmp/2024-10-14.ght6zqtm.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-14.dheqnsfy.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=14/part-00077-9fab0adc-2449-45cb-97e6-6c1b72afc2a7.c000.gz.parquet' [7.57MiB] into '/tmp/2024-10-14.dheqnsfy.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-21.hhxav3jm.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=21/part-00006-5db8a29e-1e0f-4871-8271-b6bf32260b3d.c000.gz.parquet' [42.40MiB] into '/tmp/2024-10-21.hhxav3jm.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-21.qigyp12s.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=21/part-00002-20075ec6-e3e9-4921-a837-c75519fd15e7.c000.gz.parquet' [7.56MiB] into '/tmp/2024-10-21.qigyp12s.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-21.0tg0on3a.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=21/part-00020-ddb48036-b9c3-43dd-9f23-a879cd81c791.c000.gz.parquet' [7.56MiB] into '/tmp/2024-10-21.0tg0on3a.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-28.sq6txvur.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=li/year=2024/month=10/day=28/part-00009-99b37179-bfb8-42ce-b54e-2e35b9d1146b.c000.gz.parquet' [42.43MiB] into '/tmp/2024-10-28.sq6txvur.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-28.p6dp9mi1.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=28/part-00003-6af422bf-61ae-44d9-a1f5-560ab39a8744.c000.gz.parquet' [7.72MiB] into '/tmp/2024-10-28.p6dp9mi1.gz.parquet'.
Opened temporary file for object download: '/tmp/2024-10-28.6y2nxagi.gz.parquet'.
Downloaded 'openintel-public/fdns/basis=zonefile/source=gov/year=2024/month=10/day=28/part-00057-14e5aea2-df32-45ce-a3dd-2524ce04e479.c000.gz.parquet' [7.71MiB] into '/tmp/2024-10-28.6y2nxagi.gz.parquet'.
done: read 5557482 records

Simple example aggregate¶

Number of registered domains per (source, date) partition

In [6]:
# Count number of registered domains per source per day
pandas_df.query(
    # We know that the data only contains SOA response_type for the @ (i.e., registered domain)
    "response_type == 'SOA'"
# Group by (source, date) partition
).groupby(
    ["source", "year", "month", "day"]
# Count the number of names for the aggregate key
).agg(
    domain_count = ('query_name', 'count')
)
Out[6]:
domain_count
source year month day
gov 2024 10 7 25763
14 25917
21 26079
28 26209
li 2024 10 7 85159
14 85289
21 85287
28 85259
In [ ]:
 

Approach 2: Read OpenINTEL data (public) with Apache Spark (PySpark)¶

Spark Configuration¶

In [7]:
# Create Spark Config
sparkConf = SparkConf()
sparkConf.setMaster("local[1]")
sparkConf.setAppName("pyspark-{}-{}".format(os.getuid(), int(time.time())))
# executors
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "4G")
sparkConf.set("spark.executor.memoryOverhead", "512M")
# driver
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.memory", "2G")

# RIR-data.org Object Storage settings
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("fs.s3a.endpoint", OI_ENDPOINT)
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.signing-algorithm", "S3SignerType")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.block.size", "16M")
sparkConf.set("fs.s3a.readahead.range", "1M")
sparkConf.set("fs.s3a.experimental.input.fadvise", "normal")
sparkConf.set("io.file.buffer.size", "67108864")
sparkConf.set("spark.buffer.size", "67108864")

# Parquet I/O performance settings for cloud integration
sparkConf.set("spark.hadoop.parquet.summary.metadata.level", "NONE")
sparkConf.set("spark.sql.parquet.mergeSchema", "false")
sparkConf.set("spark.sql.parquet.filterPushdown", "true")
sparkConf.set("spark.sql.hive.metastorePartitionPruning", "true")

print("SparkConf created")
SparkConf created

Create SparkContext¶

In [8]:
# Initialize our Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
print("Started SparkContext")
Started SparkContext

Configure data of interest¶

In [9]:
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(dateutil.rrule.WEEKLY, dtstart=datetime.datetime(2024, 10, 7), until=datetime.datetime(2024, 10, 28)))

# The public source(s) to download (see index: https://openintel.nl/download/fdns/)
DO_SOURCES = {
    OI_FDNS_ZONEBASED : [
        "li",
        "gov"
    ],
    OI_FDNS_LISTBASED : [
        #"tranco"
    ]   
}

Define Spark DataFrame¶

In [10]:
my_oi_fdns_df = spark.read.option("basePath", "s3a://{}/{}/".format(OI_BUCKET_NAME, OI_FDNS_BASE)).parquet(
    *[
        # All the partition paths for the provided sources and dates
        os.path.join("s3a://{}/{}".format(OI_BUCKET.name, i_fdns_source_base), "source={}".format(i_source), "year={}".format(i_date.year), "month={:02d}".format(i_date.month), "day={:02d}".format(i_date.day))
        for i_fdns_source_base in DO_SOURCES.keys()
        for i_source in DO_SOURCES[i_fdns_source_base]
        for i_date in DO_DATES
    ]
)

Simple example aggregate (similar to above)¶

In [12]:
my_oi_fdns_df.filter(
     # We know that the data only contains SOA response_type for the @ (i.e., registered domain)
    psf.col("response_type") == "SOA"
).groupby(
    # Group by (source, date) partition
    ["source", "year", "month", "day"]
).agg(
    # Count the number of names for the aggregate key
    psf.count("query_name").alias("domain_count")
).orderBy("source", "year", "month", "day").show(10)
+------+----+-----+---+------------+
|source|year|month|day|domain_count|
+------+----+-----+---+------------+
|   gov|2024|   10|  7|       25763|
|   gov|2024|   10| 14|       25917|
|   gov|2024|   10| 21|       26079|
|   gov|2024|   10| 28|       26209|
|    li|2024|   10|  7|       85159|
|    li|2024|   10| 14|       85289|
|    li|2024|   10| 21|       85287|
|    li|2024|   10| 28|       85259|
+------+----+-----+---+------------+

Stop SparkContext¶

In [13]:
sc.stop()
print("Stopped SparkContext")
Stopped SparkContext