RIR-data.org -- PySpark notebook

Author: Mattijs Jonker <m.jonker[at]utwente.nl>

License of this notebook: CC BY-SA 4.0

Example code to load public RIR-level data, using the AWS SDK or PySpark.

For use of our data, read our terms of use here

CHANGELOG:

  • v1.0.0 -- Initial version of notebook

Imports¶

In [1]:
import os
import datetime
import fnmatch
import tempfile
import time
import dateutil

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as psf

import boto3
import botocore
import pandas as pd
import bz2
import json

Global Configuration¶

In [2]:
# The OpenINTEL / RIR-data.org repository endpoint (S3-capable)
S3_ENDPOINT = "https://data.rir-data.org"

# The RIR-data bucket and data type bases
RIR_DATA_BUCKET_NAME = "rir-data"
RIR_DATA_RDNS_BASE = "rirs-rdns-formatted/type=enriched"
RIR_DATA_WHOIS_BASE = "whois-formatted/type=enriched"
In [ ]:
 

Configure date(s) of interest¶

In [3]:
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(
    dateutil.rrule.DAILY,
    dtstart=datetime.datetime(2022, 11, 1), # The earliest date partition in RIR_DATA_WHOIS_BASE is 2022-11-01 and in RIR_DATA_RDNS_BASE 2022-10-12
    count=1 # Take n days forward; alternatively use dtuntil=datetime.datetime(...)
))

Read RIR-level data with the Amazon Web Services (AWS) SDK for Python (boto3) and Pandas¶

Step 1: Define boto3 Resource and Client¶

In [4]:
# Get a boto Resource
S3R = boto3.resource(
    "s3",
    "nl-utwente",
    endpoint_url          = S3_ENDPOINT,
    # We're using anonymous access
    config = botocore.config.Config(
        signature_version = botocore.UNSIGNED,
    )
)

# Get its client, for lower-level actions, if needed
S3C = S3R.meta.client

# Prevent some request going to AWS instead of our server
S3C.meta.events.unregister('before-sign.s3', botocore.utils.fix_s3_host)

# The RIR-data data bucket
S3_BUCKET = S3R.Bucket(RIR_DATA_BUCKET_NAME)

Step 2: Read data into Pandas DataFrame¶

Step 2a: RIR-level rDNS zonefile data Pandas DataFrame¶

note: this example uses a tempFile so you could choose to retain the objects locally (by setting DELETE_TEMP_OBJECTS=False).

In [5]:
DELETE_TEMP_OBJECTS = True

# Iterate DO_DATES
json_records = []
for i_date in DO_DATES:

    # Iterate objects in date partition (typically one object)
    for i_obj in S3_BUCKET.objects.filter(
        
        # Build a partition path for the given date partition
        Prefix=os.path.join(
            RIR_DATA_RDNS_BASE,
            "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day)
        )
    ):

        # Open a temporary file to download the object into
        with tempfile.NamedTemporaryFile(mode="w+b", dir="/tmp", prefix="{}.".format(i_date.date().isoformat()), suffix=".jsonl.bz2", delete=DELETE_TEMP_OBJECTS) as tempFile:
    
            print("Opened temporary file for object download: '{}'.".format(tempFile.name))
            S3_BUCKET.download_fileobj(Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 64*1024*1024)) # A too low chunksize might lead to requests hitting the rate-limiter
            print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(
                os.path.join(S3_BUCKET.name, i_obj.key),
                os.path.getsize(tempFile.name) / (1024*1024),
                tempFile.name
            ))
            
            tempFile.flush()
            with bz2.open(tempFile.name, "r") as tempFile2:
                for i_line in list(tempFile2):
                    _jl = json.loads(i_line)
                    if type(_jl) == dict:
                        json_records.append(_jl)
                    elif type(_jl) == list:
                        json_records.extend(_jl)
                    
# Create Pandas DF using data records from objects
my_rir_data_rdns_pdf = pd.DataFrame(json_records)
print("Created Pandas DataFrame with {} records".format(len(my_rir_data_rdns_pdf)))
my_rir_data_rdns_pdf.head(5)
Opened temporary file for object download: '/tmp/2022-11-01.vn2onoxd.jsonl.bz2'.
Downloaded 'rir-data/rirs-rdns-formatted/type=enriched/year=2022/month=11/day=01/hour=00/all_rdns__pytricia_2022110100_2022110123_without_RRSIG_NSEC_DNSKEY.jsonl.bz2' [10.43MiB] into '/tmp/2022-11-01.vn2onoxd.jsonl.bz2'.
Created Pandas DataFrame with 1301480 records
Out[5]:
prefixes host_bit start_address end_address adjusted rfc_2317 timestamp source af rdns
0 [1.0.0.0/8] False 1.0.0.0 1.255.255.255 True False 1667322000 INTERNIC 4 {'name': ['1.in-addr.arpa.'], 'origin': ['in-a...
1 [1.0.0.0/24] False 1.0.0.0 1.0.0.255 True False 1667340000 APNIC 4 {'name': ['0.0.1.in-addr.arpa.'], 'origin': ['...
2 [1.0.4.0/24] False 1.0.4.0 1.0.4.255 True False 1667340000 APNIC 4 {'name': ['4.0.1.in-addr.arpa.'], 'origin': ['...
3 [1.0.5.0/24] False 1.0.5.0 1.0.5.255 True False 1667340000 APNIC 4 {'name': ['5.0.1.in-addr.arpa.'], 'origin': ['...
4 [1.0.6.0/24] False 1.0.6.0 1.0.6.255 True False 1667340000 APNIC 4 {'name': ['6.0.1.in-addr.arpa.'], 'origin': ['...
Step 2b: RIR-level WHOIS data Pandas DataFrame¶

notes:

  1. this example uses a tempFile so you could choose to retain the objects locally (by setting DELETE_TEMP_OBJECTS=False)
  2. each partition of RIR-data.org WHOIS data is approximately 145MiB. Thus, loading more than a handful partitions will require a lot of memory. You should consider devising an approach to process the data one date partition at a time (if possible) and then combining the results in a later step.
In [6]:
DELETE_TEMP_OBJECTS = True

# Iterate DO_DATES
json_records = []
for i_date in DO_DATES:

    # Iterate objects in date partition (typically one object)
    for i_obj in S3_BUCKET.objects.filter(
        
        # Build a partition path for the given date partition
        Prefix=os.path.join(
            RIR_DATA_WHOIS_BASE,
            "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day)
        )
    ):

        # Open a temporary file to download the object into
        with tempfile.NamedTemporaryFile(mode="w+b", dir="/tmp", prefix="{}.".format(i_date.date().isoformat()), suffix=".jsonl.bz2", delete=DELETE_TEMP_OBJECTS) as tempFile:
    
            print("Opened temporary file for object download: '{}'.".format(tempFile.name))
            S3_BUCKET.download_fileobj(Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 64*1024*1024)) # A too low chunksize might lead to requests hitting the rate-limiter
            print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(
                os.path.join(S3_BUCKET.name, i_obj.key),
                os.path.getsize(tempFile.name) / (1024*1024),
                tempFile.name
            ))
            
            tempFile.flush()
            count = { "dict" : 0, "list" : 0}
            with bz2.open(tempFile.name, "r") as tempFile2:
                for i_line in list(tempFile2):
                    json_records.append(json.loads(i_line))
                    
# Create Pandas DF using data records from objects
my_rir_data_whois_pdf = pd.DataFrame(json_records)
print("Created Pandas DataFrame with {} records".format(len(my_rir_data_whois_pdf)))
my_rir_data_whois_pdf.head(5)
Opened temporary file for object download: '/tmp/2022-11-01.5iqjg1ly.jsonl.bz2'.
Downloaded 'rir-data/whois-formatted/type=enriched/year=2022/month=11/day=01/hour=20/all_objects_pytricia_inetnum_2022110100_2022110123.jsonl.bz2' [142.19MiB] into '/tmp/2022-11-01.5iqjg1ly.jsonl.bz2'.
Created Pandas DataFrame with 6193804 records
Out[6]:
serial use_route prefixes af start_address end_address netname descr country status mnt-by created last-modified source related_delegation_prefix_match origin
0 1246136 False [0.0.0.0/0] 4 0.0.0.0 255.255.255.255 IANA-BLK The whole IPv4 address space EU ALLOCATED UNSPECIFIED AFRINIC-HM-MNT 991094400 1107561600 AFRINIC None NaN
1 54504831 False [0.0.0.0/8, 1.0.0.0/9, 1.128.0.0/11, 1.160.0.0... 4 0.0.0.0 1.178.111.255 NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK IPv4 address block not managed by the RIPE NCC EU ALLOCATED UNSPECIFIED RIPE-NCC-HM-MNT 1638804396 1638804396 RIPE None NaN
2 10565151 False [1.0.0.0/8] 4 1.0.0.0 1.255.255.255 APNIC-AP Asia Pacific Network Information Centre Region... AU ALLOCATED PORTABLE APNIC-HM 1528864175 1528864175 APNIC None NaN
3 54504831 False [0.0.0.0/8, 1.0.0.0/9, 1.128.0.0/11, 1.160.0.0... 4 0.0.0.0 1.178.111.255 NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK IPv4 address block not managed by the RIPE NCC EU ALLOCATED UNSPECIFIED RIPE-NCC-HM-MNT 1638804396 1638804396 RIPE None NaN
4 10565151 False [1.0.0.0/24] 4 1.0.0.0 1.0.0.255 APNIC-LABS APNIC and Cloudflare DNS Resolver project Rout... AU ASSIGNED PORTABLE APNIC-HM 1594818657 1594818657 APNIC None NaN

Read RIR-level data into Spark DataFrames¶

Step 1: Spark Configuration¶

In [7]:
# Create Spark Config
NUM_THREADS = 1 # Increase if possible

sparkConf = SparkConf()
sparkConf.setMaster(f"local[{NUM_THREADS}]") #
sparkConf.setAppName("pyspark-{}-{}".format(os.getuid(), int(time.time())))
# executors
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "4G")
sparkConf.set("spark.executor.memoryOverhead", "512M")
# driver
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.memory", "2G")

# RIR-data.org Object Storage settings
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("fs.s3a.endpoint", S3_ENDPOINT)
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.signing-algorithm", "S3SignerType")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.block.size", "64M")
sparkConf.set("fs.s3a.readahead.range", "1M")
sparkConf.set("fs.s3a.experimental.input.fadvise", "normal")
sparkConf.set("io.file.buffer.size", "67108864")
sparkConf.set("spark.buffer.size", "67108864")

# Parquet I/O performance settings for cloud integration
sparkConf.set("spark.hadoop.parquet.summary.metadata.level", "NONE")
sparkConf.set("spark.sql.parquet.mergeSchema", "false")
sparkConf.set("spark.sql.parquet.filterPushdown", "true")
sparkConf.set("spark.sql.hive.metastorePartitionPruning", "true")

print("SparkConf created")
SparkConf created

Step 2: Create SparkContext¶

In [8]:
# Initialize our Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
print("Started SparkContext")
Started SparkContext

Step 3: Define Spark DataFrame¶

Step 3a: RIR-level rDNS zonefile data DataFrame¶
In [9]:
my_rir_data_rdns_df = spark.read.option("basePath", "s3a://{}/{}/".format(S3_BUCKET.name, RIR_DATA_RDNS_BASE)).format("json").load(
    *[
        # All the partition paths for the provided sources and dates
        os.path.join("s3a://{}/{}".format(S3_BUCKET.name, RIR_DATA_RDNS_BASE), "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day))
        for i_date in DO_DATES
    ]
)
Show the data structure of the RIR-level rDNS data¶
In [10]:
my_rir_data_rdns_df.printSchema()
root
 |-- adjusted: boolean (nullable = true)
 |-- af: long (nullable = true)
 |-- end_address: string (nullable = true)
 |-- host_bit: boolean (nullable = true)
 |-- prefixes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rdns: struct (nullable = true)
 |    |-- name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- origin: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- rdatasets: struct (nullable = true)
 |    |    |-- A: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- AAAA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- CNAME: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- DS: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- NS: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- SOA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- rdclass: string (nullable = true)
 |    |-- ttl: long (nullable = true)
 |-- rfc_2317: boolean (nullable = true)
 |-- source: string (nullable = true)
 |-- start_address: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)

Show a few example rows¶
In [11]:
my_rir_data_rdns_df.show(5, truncate=64)
+--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+
|adjusted| af|  end_address|host_bit|    prefixes|                                                            rdns|rfc_2317|  source|start_address| timestamp|year|month|day|hour|
+--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+
|    true|  4|1.255.255.255|   false| [1.0.0.0/8]|{[1.in-addr.arpa.], [in-addr.arpa.], {NULL, NULL, NULL, [1551...|   false|INTERNIC|      1.0.0.0|1667322000|2022|   11|  1|   0|
|    true|  4|    1.0.0.255|   false|[1.0.0.0/24]|{[0.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...|   false|   APNIC|      1.0.0.0|1667340000|2022|   11|  1|   0|
|    true|  4|    1.0.4.255|   false|[1.0.4.0/24]|{[4.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...|   false|   APNIC|      1.0.4.0|1667340000|2022|   11|  1|   0|
|    true|  4|    1.0.5.255|   false|[1.0.5.0/24]|{[5.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...|   false|   APNIC|      1.0.5.0|1667340000|2022|   11|  1|   0|
|    true|  4|    1.0.6.255|   false|[1.0.6.0/24]|{[6.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...|   false|   APNIC|      1.0.6.0|1667340000|2022|   11|  1|   0|
+--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+
only showing top 5 rows

Step 3b: RIR-level WHOIS data DataFrame¶

note: each partition of RIR-data.org WHOIS data is approximately 145MiB. Thus, for a longitudinal analysis, you are recommended to be mindful of the memory requirements of your Spark application. You can also devise an approach to process the data one partition at a time, for example by running your analysis on each date separately (if possible) and then combining the results in a later step.

In [12]:
my_rir_data_whois_df = spark.read.option("basePath", "s3a://{}/{}/".format(S3_BUCKET.name, RIR_DATA_WHOIS_BASE)).format("json").load(
    *[
        # All the partition paths for the provided sources and dates
        os.path.join("s3a://{}/{}".format(S3_BUCKET.name, RIR_DATA_WHOIS_BASE), "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day))
        for i_date in DO_DATES
    ]
)
Show the data structure of the RIR-level WHOIS data¶
In [13]:
my_rir_data_whois_df.printSchema()
root
 |-- af: long (nullable = true)
 |-- country: string (nullable = true)
 |-- created: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- end_address: string (nullable = true)
 |-- last-modified: string (nullable = true)
 |-- mnt-by: string (nullable = true)
 |-- netname: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- prefixes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- related_delegation_prefix_match: string (nullable = true)
 |-- serial: long (nullable = true)
 |-- source: string (nullable = true)
 |-- start_address: string (nullable = true)
 |-- status: string (nullable = true)
 |-- use_route: boolean (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)

Show a few example rows¶
In [14]:
my_rir_data_whois_df.show(3, truncate=32)
+---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+
| af|country|   created|                           descr|    end_address|last-modified|         mnt-by|                         netname|origin|                        prefixes|related_delegation_prefix_match|  serial| source|start_address|               status|use_route|year|month|day|hour|
+---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+
|  4|     EU| 991094400|    The whole IPv4 address space|255.255.255.255|   1107561600| AFRINIC-HM-MNT|                        IANA-BLK|  NULL|                     [0.0.0.0/0]|                           NULL| 1246136|AFRINIC|      0.0.0.0|ALLOCATED UNSPECIFIED|    false|2022|   11|  1|  20|
|  4|     EU|1638804396|IPv4 address block not manage...|  1.178.111.255|   1638804396|RIPE-NCC-HM-MNT|NON-RIPE-NCC-MANAGED-ADDRESS-...|  NULL|[0.0.0.0/8, 1.0.0.0/9, 1.128....|                           NULL|54504831|   RIPE|      0.0.0.0|ALLOCATED UNSPECIFIED|    false|2022|   11|  1|  20|
|  4|     AU|1528864175|Asia Pacific Network Informat...|  1.255.255.255|   1528864175|       APNIC-HM|                        APNIC-AP|  NULL|                     [1.0.0.0/8]|                           NULL|10565151|  APNIC|      1.0.0.0|   ALLOCATED PORTABLE|    false|2022|   11|  1|  20|
+---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+
only showing top 3 rows

Stop SparkContext¶

In [15]:
sc.stop()
print("Stopped SparkContext")
Stopped SparkContext
In [ ]: