RIR-data.org -- PySpark notebook
Author: Mattijs Jonker <m.jonker[at]utwente.nl>
License of this notebook: CC BY-SA 4.0
Example code to load public RIR-level data, using the AWS SDK or PySpark.
For use of our data, read our terms of use here
CHANGELOG:
- v1.0.0 -- Initial version of notebook
Imports¶
import os
import datetime
import fnmatch
import tempfile
import time
import dateutil
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as psf
import boto3
import botocore
import pandas as pd
import bz2
import json
Global Configuration¶
# The OpenINTEL / RIR-data.org repository endpoint (S3-capable)
S3_ENDPOINT = "https://data.rir-data.org"
# The RIR-data bucket and data type bases
RIR_DATA_BUCKET_NAME = "rir-data"
RIR_DATA_RDNS_BASE = "rirs-rdns-formatted/type=enriched"
RIR_DATA_WHOIS_BASE = "whois-formatted/type=enriched"
Configure date(s) of interest¶
# The date(s) to download
DO_DATES = list(dateutil.rrule.rrule(
dateutil.rrule.DAILY,
dtstart=datetime.datetime(2022, 11, 1), # The earliest date partition in RIR_DATA_WHOIS_BASE is 2022-11-01 and in RIR_DATA_RDNS_BASE 2022-10-12
count=1 # Take n days forward; alternatively use dtuntil=datetime.datetime(...)
))
Read RIR-level data with the Amazon Web Services (AWS) SDK for Python (boto3) and Pandas¶
Step 1: Define boto3 Resource and Client¶
# Get a boto Resource
S3R = boto3.resource(
"s3",
"nl-utwente",
endpoint_url = S3_ENDPOINT,
# We're using anonymous access
config = botocore.config.Config(
signature_version = botocore.UNSIGNED,
)
)
# Get its client, for lower-level actions, if needed
S3C = S3R.meta.client
# Prevent some request going to AWS instead of our server
S3C.meta.events.unregister('before-sign.s3', botocore.utils.fix_s3_host)
# The RIR-data data bucket
S3_BUCKET = S3R.Bucket(RIR_DATA_BUCKET_NAME)
Step 2: Read data into Pandas DataFrame¶
Step 2a: RIR-level rDNS zonefile data Pandas DataFrame¶
note: this example uses a tempFile so you could choose to retain the objects locally (by setting DELETE_TEMP_OBJECTS=False
).
DELETE_TEMP_OBJECTS = True
# Iterate DO_DATES
json_records = []
for i_date in DO_DATES:
# Iterate objects in date partition (typically one object)
for i_obj in S3_BUCKET.objects.filter(
# Build a partition path for the given date partition
Prefix=os.path.join(
RIR_DATA_RDNS_BASE,
"year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day)
)
):
# Open a temporary file to download the object into
with tempfile.NamedTemporaryFile(mode="w+b", dir="/tmp", prefix="{}.".format(i_date.date().isoformat()), suffix=".jsonl.bz2", delete=DELETE_TEMP_OBJECTS) as tempFile:
print("Opened temporary file for object download: '{}'.".format(tempFile.name))
S3_BUCKET.download_fileobj(Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 64*1024*1024)) # A too low chunksize might lead to requests hitting the rate-limiter
print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(
os.path.join(S3_BUCKET.name, i_obj.key),
os.path.getsize(tempFile.name) / (1024*1024),
tempFile.name
))
tempFile.flush()
with bz2.open(tempFile.name, "r") as tempFile2:
for i_line in list(tempFile2):
_jl = json.loads(i_line)
if type(_jl) == dict:
json_records.append(_jl)
elif type(_jl) == list:
json_records.extend(_jl)
# Create Pandas DF using data records from objects
my_rir_data_rdns_pdf = pd.DataFrame(json_records)
print("Created Pandas DataFrame with {} records".format(len(my_rir_data_rdns_pdf)))
my_rir_data_rdns_pdf.head(5)
Opened temporary file for object download: '/tmp/2022-11-01.vn2onoxd.jsonl.bz2'. Downloaded 'rir-data/rirs-rdns-formatted/type=enriched/year=2022/month=11/day=01/hour=00/all_rdns__pytricia_2022110100_2022110123_without_RRSIG_NSEC_DNSKEY.jsonl.bz2' [10.43MiB] into '/tmp/2022-11-01.vn2onoxd.jsonl.bz2'. Created Pandas DataFrame with 1301480 records
prefixes | host_bit | start_address | end_address | adjusted | rfc_2317 | timestamp | source | af | rdns | |
---|---|---|---|---|---|---|---|---|---|---|
0 | [1.0.0.0/8] | False | 1.0.0.0 | 1.255.255.255 | True | False | 1667322000 | INTERNIC | 4 | {'name': ['1.in-addr.arpa.'], 'origin': ['in-a... |
1 | [1.0.0.0/24] | False | 1.0.0.0 | 1.0.0.255 | True | False | 1667340000 | APNIC | 4 | {'name': ['0.0.1.in-addr.arpa.'], 'origin': ['... |
2 | [1.0.4.0/24] | False | 1.0.4.0 | 1.0.4.255 | True | False | 1667340000 | APNIC | 4 | {'name': ['4.0.1.in-addr.arpa.'], 'origin': ['... |
3 | [1.0.5.0/24] | False | 1.0.5.0 | 1.0.5.255 | True | False | 1667340000 | APNIC | 4 | {'name': ['5.0.1.in-addr.arpa.'], 'origin': ['... |
4 | [1.0.6.0/24] | False | 1.0.6.0 | 1.0.6.255 | True | False | 1667340000 | APNIC | 4 | {'name': ['6.0.1.in-addr.arpa.'], 'origin': ['... |
Step 2b: RIR-level WHOIS data Pandas DataFrame¶
notes:
- this example uses a tempFile so you could choose to retain the objects locally (by setting
DELETE_TEMP_OBJECTS=False
) - each partition of RIR-data.org WHOIS data is approximately 145MiB. Thus, loading more than a handful partitions will require a lot of memory. You should consider devising an approach to process the data one date partition at a time (if possible) and then combining the results in a later step.
DELETE_TEMP_OBJECTS = True
# Iterate DO_DATES
json_records = []
for i_date in DO_DATES:
# Iterate objects in date partition (typically one object)
for i_obj in S3_BUCKET.objects.filter(
# Build a partition path for the given date partition
Prefix=os.path.join(
RIR_DATA_WHOIS_BASE,
"year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day)
)
):
# Open a temporary file to download the object into
with tempfile.NamedTemporaryFile(mode="w+b", dir="/tmp", prefix="{}.".format(i_date.date().isoformat()), suffix=".jsonl.bz2", delete=DELETE_TEMP_OBJECTS) as tempFile:
print("Opened temporary file for object download: '{}'.".format(tempFile.name))
S3_BUCKET.download_fileobj(Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 64*1024*1024)) # A too low chunksize might lead to requests hitting the rate-limiter
print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(
os.path.join(S3_BUCKET.name, i_obj.key),
os.path.getsize(tempFile.name) / (1024*1024),
tempFile.name
))
tempFile.flush()
count = { "dict" : 0, "list" : 0}
with bz2.open(tempFile.name, "r") as tempFile2:
for i_line in list(tempFile2):
json_records.append(json.loads(i_line))
# Create Pandas DF using data records from objects
my_rir_data_whois_pdf = pd.DataFrame(json_records)
print("Created Pandas DataFrame with {} records".format(len(my_rir_data_whois_pdf)))
my_rir_data_whois_pdf.head(5)
Opened temporary file for object download: '/tmp/2022-11-01.5iqjg1ly.jsonl.bz2'. Downloaded 'rir-data/whois-formatted/type=enriched/year=2022/month=11/day=01/hour=20/all_objects_pytricia_inetnum_2022110100_2022110123.jsonl.bz2' [142.19MiB] into '/tmp/2022-11-01.5iqjg1ly.jsonl.bz2'. Created Pandas DataFrame with 6193804 records
serial | use_route | prefixes | af | start_address | end_address | netname | descr | country | status | mnt-by | created | last-modified | source | related_delegation_prefix_match | origin | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1246136 | False | [0.0.0.0/0] | 4 | 0.0.0.0 | 255.255.255.255 | IANA-BLK | The whole IPv4 address space | EU | ALLOCATED UNSPECIFIED | AFRINIC-HM-MNT | 991094400 | 1107561600 | AFRINIC | None | NaN |
1 | 54504831 | False | [0.0.0.0/8, 1.0.0.0/9, 1.128.0.0/11, 1.160.0.0... | 4 | 0.0.0.0 | 1.178.111.255 | NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK | IPv4 address block not managed by the RIPE NCC | EU | ALLOCATED UNSPECIFIED | RIPE-NCC-HM-MNT | 1638804396 | 1638804396 | RIPE | None | NaN |
2 | 10565151 | False | [1.0.0.0/8] | 4 | 1.0.0.0 | 1.255.255.255 | APNIC-AP | Asia Pacific Network Information Centre Region... | AU | ALLOCATED PORTABLE | APNIC-HM | 1528864175 | 1528864175 | APNIC | None | NaN |
3 | 54504831 | False | [0.0.0.0/8, 1.0.0.0/9, 1.128.0.0/11, 1.160.0.0... | 4 | 0.0.0.0 | 1.178.111.255 | NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK | IPv4 address block not managed by the RIPE NCC | EU | ALLOCATED UNSPECIFIED | RIPE-NCC-HM-MNT | 1638804396 | 1638804396 | RIPE | None | NaN |
4 | 10565151 | False | [1.0.0.0/24] | 4 | 1.0.0.0 | 1.0.0.255 | APNIC-LABS | APNIC and Cloudflare DNS Resolver project Rout... | AU | ASSIGNED PORTABLE | APNIC-HM | 1594818657 | 1594818657 | APNIC | None | NaN |
Read RIR-level data into Spark DataFrames¶
Step 1: Spark Configuration¶
# Create Spark Config
NUM_THREADS = 1 # Increase if possible
sparkConf = SparkConf()
sparkConf.setMaster(f"local[{NUM_THREADS}]") #
sparkConf.setAppName("pyspark-{}-{}".format(os.getuid(), int(time.time())))
# executors
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "4G")
sparkConf.set("spark.executor.memoryOverhead", "512M")
# driver
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.memory", "2G")
# RIR-data.org Object Storage settings
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("fs.s3a.endpoint", S3_ENDPOINT)
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.signing-algorithm", "S3SignerType")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.block.size", "64M")
sparkConf.set("fs.s3a.readahead.range", "1M")
sparkConf.set("fs.s3a.experimental.input.fadvise", "normal")
sparkConf.set("io.file.buffer.size", "67108864")
sparkConf.set("spark.buffer.size", "67108864")
# Parquet I/O performance settings for cloud integration
sparkConf.set("spark.hadoop.parquet.summary.metadata.level", "NONE")
sparkConf.set("spark.sql.parquet.mergeSchema", "false")
sparkConf.set("spark.sql.parquet.filterPushdown", "true")
sparkConf.set("spark.sql.hive.metastorePartitionPruning", "true")
print("SparkConf created")
SparkConf created
Step 2: Create SparkContext¶
# Initialize our Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
print("Started SparkContext")
Started SparkContext
Step 3: Define Spark DataFrame¶
Step 3a: RIR-level rDNS zonefile data DataFrame¶
my_rir_data_rdns_df = spark.read.option("basePath", "s3a://{}/{}/".format(S3_BUCKET.name, RIR_DATA_RDNS_BASE)).format("json").load(
*[
# All the partition paths for the provided sources and dates
os.path.join("s3a://{}/{}".format(S3_BUCKET.name, RIR_DATA_RDNS_BASE), "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day))
for i_date in DO_DATES
]
)
Show the data structure of the RIR-level rDNS data¶
my_rir_data_rdns_df.printSchema()
root |-- adjusted: boolean (nullable = true) |-- af: long (nullable = true) |-- end_address: string (nullable = true) |-- host_bit: boolean (nullable = true) |-- prefixes: array (nullable = true) | |-- element: string (containsNull = true) |-- rdns: struct (nullable = true) | |-- name: array (nullable = true) | | |-- element: string (containsNull = true) | |-- origin: array (nullable = true) | | |-- element: string (containsNull = true) | |-- rdatasets: struct (nullable = true) | | |-- A: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- AAAA: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- CNAME: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- DS: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- NS: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- SOA: array (nullable = true) | | | |-- element: string (containsNull = true) | |-- rdclass: string (nullable = true) | |-- ttl: long (nullable = true) |-- rfc_2317: boolean (nullable = true) |-- source: string (nullable = true) |-- start_address: string (nullable = true) |-- timestamp: long (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- day: integer (nullable = true) |-- hour: integer (nullable = true)
Show a few example rows¶
my_rir_data_rdns_df.show(5, truncate=64)
+--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+ |adjusted| af| end_address|host_bit| prefixes| rdns|rfc_2317| source|start_address| timestamp|year|month|day|hour| +--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+ | true| 4|1.255.255.255| false| [1.0.0.0/8]|{[1.in-addr.arpa.], [in-addr.arpa.], {NULL, NULL, NULL, [1551...| false|INTERNIC| 1.0.0.0|1667322000|2022| 11| 1| 0| | true| 4| 1.0.0.255| false|[1.0.0.0/24]|{[0.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...| false| APNIC| 1.0.0.0|1667340000|2022| 11| 1| 0| | true| 4| 1.0.4.255| false|[1.0.4.0/24]|{[4.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...| false| APNIC| 1.0.4.0|1667340000|2022| 11| 1| 0| | true| 4| 1.0.5.255| false|[1.0.5.0/24]|{[5.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...| false| APNIC| 1.0.5.0|1667340000|2022| 11| 1| 0| | true| 4| 1.0.6.255| false|[1.0.6.0/24]|{[6.0.1.in-addr.arpa.], [1.in-addr.arpa.], {NULL, NULL, NULL,...| false| APNIC| 1.0.6.0|1667340000|2022| 11| 1| 0| +--------+---+-------------+--------+------------+----------------------------------------------------------------+--------+--------+-------------+----------+----+-----+---+----+ only showing top 5 rows
Step 3b: RIR-level WHOIS data DataFrame¶
note: each partition of RIR-data.org WHOIS data is approximately 145MiB. Thus, for a longitudinal analysis, you are recommended to be mindful of the memory requirements of your Spark application. You can also devise an approach to process the data one partition at a time, for example by running your analysis on each date separately (if possible) and then combining the results in a later step.
my_rir_data_whois_df = spark.read.option("basePath", "s3a://{}/{}/".format(S3_BUCKET.name, RIR_DATA_WHOIS_BASE)).format("json").load(
*[
# All the partition paths for the provided sources and dates
os.path.join("s3a://{}/{}".format(S3_BUCKET.name, RIR_DATA_WHOIS_BASE), "year={}/month={:02d}/day={:02d}".format(i_date.year, i_date.month, i_date.day))
for i_date in DO_DATES
]
)
Show the data structure of the RIR-level WHOIS data¶
my_rir_data_whois_df.printSchema()
root |-- af: long (nullable = true) |-- country: string (nullable = true) |-- created: string (nullable = true) |-- descr: string (nullable = true) |-- end_address: string (nullable = true) |-- last-modified: string (nullable = true) |-- mnt-by: string (nullable = true) |-- netname: string (nullable = true) |-- origin: string (nullable = true) |-- prefixes: array (nullable = true) | |-- element: string (containsNull = true) |-- related_delegation_prefix_match: string (nullable = true) |-- serial: long (nullable = true) |-- source: string (nullable = true) |-- start_address: string (nullable = true) |-- status: string (nullable = true) |-- use_route: boolean (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- day: integer (nullable = true) |-- hour: integer (nullable = true)
Show a few example rows¶
my_rir_data_whois_df.show(3, truncate=32)
+---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ | af|country| created| descr| end_address|last-modified| mnt-by| netname|origin| prefixes|related_delegation_prefix_match| serial| source|start_address| status|use_route|year|month|day|hour| +---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ | 4| EU| 991094400| The whole IPv4 address space|255.255.255.255| 1107561600| AFRINIC-HM-MNT| IANA-BLK| NULL| [0.0.0.0/0]| NULL| 1246136|AFRINIC| 0.0.0.0|ALLOCATED UNSPECIFIED| false|2022| 11| 1| 20| | 4| EU|1638804396|IPv4 address block not manage...| 1.178.111.255| 1638804396|RIPE-NCC-HM-MNT|NON-RIPE-NCC-MANAGED-ADDRESS-...| NULL|[0.0.0.0/8, 1.0.0.0/9, 1.128....| NULL|54504831| RIPE| 0.0.0.0|ALLOCATED UNSPECIFIED| false|2022| 11| 1| 20| | 4| AU|1528864175|Asia Pacific Network Informat...| 1.255.255.255| 1528864175| APNIC-HM| APNIC-AP| NULL| [1.0.0.0/8]| NULL|10565151| APNIC| 1.0.0.0| ALLOCATED PORTABLE| false|2022| 11| 1| 20| +---+-------+----------+--------------------------------+---------------+-------------+---------------+--------------------------------+------+--------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ only showing top 3 rows
Stop SparkContext¶
sc.stop()
print("Stopped SparkContext")
Stopped SparkContext