RIR-Data.org -- PySpark notebook Author: Mattijs Jonker <m.jonker[at]utwente.nl> CC0
Example code to load our RIR-data.org consolidated rDNS and WHOIS data sets, using PySpark or Python+boto+Pandas.
Please cite our paper if you make use of RIR-data.org data: Alfred Arouna, Ioana Livadariu, and Mattijs Jonker. "Lowering the Barriers to Working with Public RIR-Level Data." Proceedings of the 2023 Workshop on Applied Networking Research (ANRW '23).
import os
import datetime
import time
import tempfile
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as psf
import pandas as pd
import boto3
import botocore
# Create Spark Config
sparkConf = SparkConf()
sparkConf.setMaster("local[1]")
sparkConf.setAppName("pyspark-{}-{}".format(os.getuid(), int(time.time())))
# executors
sparkConf.set("spark.executor.instances", "1")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.executor.memory", "4G")
sparkConf.set("spark.executor.memoryOverhead", "512M")
# driver
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.driver.memory", "2G")
# RIR-data.org Object Storage settings
sparkConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sparkConf.set("fs.s3a.endpoint", "https://data.rir-data.org")
sparkConf.set("fs.s3a.connection.ssl.enabled", "true")
sparkConf.set("fs.s3a.signing-algorithm", "S3SignerType")
sparkConf.set("fs.s3a.path.style.access", "true")
sparkConf.set("fs.s3a.block.size", "16M")
sparkConf.set("fs.s3a.readahead.range", "1M")
sparkConf.set("fs.s3a.experimental.input.fadvise", "normal")
sparkConf.set("io.file.buffer.size", "67108864")
sparkConf.set("spark.buffer.size", "67108864")
print("SparkConf created")
SparkConf created
# Initialize our Spark Session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext
print("Started SparkSession")
Started SparkSession
my_rir_data_df = spark.read.format("json").option("basePath", "s3a://rir-data/rirs-rdns-formatted/type=enriched").load(
["s3a://rir-data/rirs-rdns-formatted/type=enriched/year=2023/month=05/day=31/"]
)
my_rir_data_df = my_rir_data_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
my_rir_data_df.printSchema()
root |-- adjusted: boolean (nullable = true) |-- af: long (nullable = true) |-- end_address: string (nullable = true) |-- host_bit: boolean (nullable = true) |-- prefixes: array (nullable = true) | |-- element: string (containsNull = true) |-- rdns: struct (nullable = true) | |-- name: array (nullable = true) | | |-- element: string (containsNull = true) | |-- origin: array (nullable = true) | | |-- element: string (containsNull = true) | |-- rdatasets: struct (nullable = true) | | |-- A: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- AAAA: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- CNAME: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- DS: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- NS: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- SOA: array (nullable = true) | | | |-- element: string (containsNull = true) | |-- rdclass: string (nullable = true) | |-- ttl: long (nullable = true) |-- rfc_2317: boolean (nullable = true) |-- source: string (nullable = true) |-- start_address: string (nullable = true) |-- timestamp: long (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- day: integer (nullable = true) |-- hour: integer (nullable = true)
my_rir_data_df.show(2, truncate=False)
my_whois_data_df = spark.read.format("json").option("basePath", "s3a://rir-data/whois-formatted/type=enriched").load(
["s3a://rir-data/whois-formatted/type=enriched/year=2023/month=05/day=31/"]
)
my_whois_data_df = my_whois_data_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
my_whois_data_df.printSchema()
root |-- af: long (nullable = true) |-- country: string (nullable = true) |-- created: string (nullable = true) |-- descr: string (nullable = true) |-- end_address: string (nullable = true) |-- last-modified: string (nullable = true) |-- mnt-by: string (nullable = true) |-- netname: string (nullable = true) |-- origin: string (nullable = true) |-- prefixes: array (nullable = true) | |-- element: string (containsNull = true) |-- related_delegation_prefix_match: string (nullable = true) |-- serial: long (nullable = true) |-- source: string (nullable = true) |-- start_address: string (nullable = true) |-- status: string (nullable = true) |-- use_route: boolean (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) |-- day: integer (nullable = true) |-- hour: integer (nullable = true)
my_whois_data_df.show(2, truncate=False)
+---+-------+----------+----------------------------------------------+---------------+-------------+---------------+----------------------------------+------+------------------------------------------------------------------------------------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ |af |country|created |descr |end_address |last-modified|mnt-by |netname |origin|prefixes |related_delegation_prefix_match|serial |source |start_address|status |use_route|year|month|day|hour| +---+-------+----------+----------------------------------------------+---------------+-------------+---------------+----------------------------------+------+------------------------------------------------------------------------------------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ |4 |EU |991094400 |The whole IPv4 address space |255.255.255.255|1107561600 |AFRINIC-HM-MNT |IANA-BLK |null |[0.0.0.0/0] |null |1291449 |AFRINIC|0.0.0.0 |ALLOCATED UNSPECIFIED|false |2023|5 |31 |20 | |4 |EU |1638804396|IPv4 address block not managed by the RIPE NCC|1.178.111.255 |1638804396 |RIPE-NCC-HM-MNT|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|null |[0.0.0.0/8, 1.0.0.0/9, 1.128.0.0/11, 1.160.0.0/12, 1.176.0.0/15, 1.178.0.0/18, 1.178.64.0/19, 1.178.96.0/20]|null |56439066|RIPE |0.0.0.0 |ALLOCATED UNSPECIFIED|false |2023|5 |31 |20 | +---+-------+----------+----------------------------------------------+---------------+-------------+---------------+----------------------------------+------+------------------------------------------------------------------------------------------------------------+-------------------------------+--------+-------+-------------+---------------------+---------+----+-----+---+----+ only showing top 2 rows
sc.stop()
S3R_RIR_DATA = boto3.resource(
"s3",
"nl-utwente",
endpoint_url = "https://data.rir-data.org",
config = botocore.config.Config(
signature_version = botocore.UNSIGNED,
)
)
# Get its client, for lower-level actions, if needed
S3C_RIR_DATA = S3R_RIR_DATA.meta.client
# Prevent some request going to AWS instead of our server
S3C_RIR_DATA.meta.events.unregister('before-sign.s3', botocore.utils.fix_s3_host)
# The RIR data bucket
RIR_DATA_BUCKET = S3R_RIR_DATA.Bucket("rir-data")
# The rDNS and WHOIS data base prefixes
RIR_DATA_WHOIS_BASE = "whois-formatted/type=enriched"
RIR_DATA_RDNS_BASE = "rirs-rdns-formatted/type=enriched"
do_date = datetime.datetime(2023, 5, 31)
pandas_df_list = []
for i_obj in RIR_DATA_BUCKET.objects.filter(
# Build a partition path for the given data and date
Prefix=os.path.join(
RIR_DATA_RDNS_BASE,
"year={}".format(do_date.year),
"month={:02d}".format(do_date.month),
"day={:02d}".format(do_date.day)
)
):
# Open a temporary file to download the object into
with tempfile.NamedTemporaryFile(mode="w+b", dir="/tmp", prefix="{}.".format(do_date.date().isoformat()), suffix=".jsonl.bz2", delete=False) as tempFile:
print("Opened temporary file for object download: '{}'.".format(tempFile.name))
RIR_DATA_BUCKET.download_fileobj(Key=i_obj.key, Fileobj=tempFile, Config=boto3.s3.transfer.TransferConfig(multipart_chunksize = 16*1024*1024))
print("Downloaded '{}' [{:.2f}MiB] into '{}'.".format(
os.path.join(RIR_DATA_BUCKET.name, i_obj.key),
os.path.getsize(tempFile.name) / (1024*1024),
tempFile.name
))
# Use Pandas to read file into a DF and append to list
pandas_df_list.append(
pd.read_json(tempFile.name, compression="bz2", lines=True)
)
# Concatenate object-specific DFs
pandas_df = pd.concat(pandas_df_list)
Opened temporary file for object download: '/tmp/2023-05-31.w5vgonys.jsonl.bz2'. Downloaded 'rir-data/rirs-rdns-formatted/type=enriched/year=2023/month=05/day=31/hour=00/all_rdns__pytricia_2023053100_2023053123_without_RRSIG_NSEC_DNSKEY.jsonl.bz2' [10.56MiB] into '/tmp/2023-05-31.w5vgonys.jsonl.bz2'.