sparkrawl Demo¶

This notebook demonstrates the main usage patterns that sparkrawl is designed for.

In [1]:
from sparkrawl import *

# Utilities for test/demo only; not packaged.
from ospath import *
from fileio import *
from testcases import *

Ad hoc Data Layouts¶

sparkrawl is motivated by crawling custom data layouts, e.g., for early-stage data projects. So for this demo we will simulate data retrieval from S3 object storage.

We start by initializing a fake data store, and populating it with example data, organized according to an ad hoc path embedding of example attributes: color, year, size.

In [2]:
# Initialize fake S3.
from pathlib import Path
import tempfile

tmpdir = tempfile.TemporaryDirectory()
tmp_path = Path(tmpdir.name)

from fakes3 import *
fake_s3 = FakeS3(tmp_path / "fake-s3")

from fileio import write_jsonlines

def write_s3_json(data, uri: str):
    with tempfile.NamedTemporaryFile() as f:
        fpath = Path(f.name)
        write_jsonlines(data, fpath)
        fake_s3.put(fpath, uri)

# Populate with example data.
data_root_uri = "s3://fake-bucket/fake-prefix"
prefix = Uri.from_uri(data_root_uri)

COLORS = ["green", "red", "blue"]
YEARS = [2024, 2025]
SIZES = ["large", "small"]

import random

for c in COLORS:
    for y in YEARS:
        for sz in SIZES:
            for k in range(4):
                data = [{"x": random.random()} for _ in range(10)]
                uri_ = prefix / c / str(y) / f"size={sz}" / f"{k}.jsonl"
                uri = str(uri_)
                write_s3_json(data, uri)

def list_all(uri):
    return [
        *(uri_ for uri_ in fake_s3.list_objects(uri)),
        *(
            uri__
            for uri_ in fake_s3.list_prefixes(uri)
            for uri__ in list_all(uri_)            
        ),
    ]

print("Example data..")
print(data)
print()

print("First few partitions..")
list_all(data_root_uri)[:10]
Example data..
[{'x': 0.42094823536397274}, {'x': 0.8173876154560311}, {'x': 0.9524356175681418}, {'x': 0.5183503776883939}, {'x': 0.291400519606167}, {'x': 0.2707331207445187}, {'x': 0.800316351377395}, {'x': 0.052817573828492614}, {'x': 0.8289190419259124}, {'x': 0.14086218239897608}]

First few partitions..
Out[2]:
['s3://fake-bucket/fake-prefix/red/2025/size=small/3.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=small/2.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=small/0.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=small/1.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=large/3.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=large/2.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=large/0.jsonl',
 's3://fake-bucket/fake-prefix/red/2025/size=large/1.jsonl',
 's3://fake-bucket/fake-prefix/red/2024/size=small/3.jsonl',
 's3://fake-bucket/fake-prefix/red/2024/size=small/2.jsonl']

Note that the individual files only contain measurements of a scalar x. We need to parse the object URIs to glean the additional contextual data.

Branching with embedded attributes¶

First, let's look at what we might do without using sparkrawl.

We see that the first branch from the root prefix encodes the color attribute. As in our example, this field may not actually appear in the records themselves. There are various reasons it may be omitted, but a common one is to avoid storing a lot of redundant text.

For data processing purposes, however, we typical want those fields to be present in the records. So we can define a branch iterator that yields each sub-prefix with associated metadata. In general, a branch could embed a number of attributes, so we tag with dicts.

In [3]:
def list_prefixes_with_color_metadata(uri):
    """This method knows how to interpet the directory name at this level as metadata."""
    return [
        (prefix, dict(color=uri_name(prefix)))
        for prefix in fake_s3.list_prefixes(uri)
    ]
    
def uri_name(uri: str):
    return Uri.from_uri(uri).key_path.name

list_prefixes_with_color_metadata(data_root_uri)
Out[3]:
[('s3://fake-bucket/fake-prefix/red', {'color': 'red'}),
 ('s3://fake-bucket/fake-prefix/green', {'color': 'green'}),
 ('s3://fake-bucket/fake-prefix/blue', {'color': 'blue'})]

Now we can easily query the metadata associated with any prefix to obtain the color value of all records under it.

Diving deeper¶

When we crawl deeper, we will want to collect all the metadata along each path from the root.

Let's try collecting all prefixes at depth 2.

In [4]:
def list_prefixes_with_year_metadata(uri):
    return [
        (prefix, dict(year=int(uri_name(prefix))))
        for prefix in fake_s3.list_prefixes(uri)
    ]

def list_prefixes_with_color_and_year_metadata(tagged):
    prefix, base_attrs = tagged
    return [
        (year_prefix, {**base_attrs, **color_attrs, **year_attrs})
        for color_prefix, color_attrs in list_prefixes_with_color_metadata(prefix)
        for year_prefix, year_attrs in list_prefixes_with_year_metadata(color_prefix)
    ]

tagged = data_root_uri, {"global_attr": 42}
list_prefixes_with_color_and_year_metadata(tagged)
Out[4]:
[('s3://fake-bucket/fake-prefix/red/2025',
  {'global_attr': 42, 'color': 'red', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/red/2024',
  {'global_attr': 42, 'color': 'red', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/green/2025',
  {'global_attr': 42, 'color': 'green', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/green/2024',
  {'global_attr': 42, 'color': 'green', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/blue/2025',
  {'global_attr': 42, 'color': 'blue', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/blue/2024',
  {'global_attr': 42, 'color': 'blue', 'year': 2024})]

Enter sparkrawl, AKA Does it scale?¶

The straightforward approach can get out of hand quickly with even just a few levels of nesting. Writing or updating crawlers this way can be time consuming and error-prone.

This is where sparkrawl comes in. sparkrawl lets us build crawlers out of modular components.

The central concept of sparkrawl is an "exploder". An exploder is a tagged branch iterator just like those we've defined above, but it takes a tagged parent element as input, rather than the parent element alone. sparkrawl focuses on this particular function signature only because it is the most general input-output format.

We only have to make a small adjustment to the iterator functions above to have them in "exploder" form.

In [5]:
def as_exploder(key_iter_fn):
    
    def exploder(tagged):
        key, _attrs = tagged
        return key_iter_fn(key)

    return exploder

color_exploder = as_exploder(list_prefixes_with_color_metadata)
year_exploder = as_exploder(list_prefixes_with_year_metadata)

Time to "explode"¶

The central utility of sparkrawl is the explode_with function, which decorates exploders to handle the merging of parent attributes into child ones during a crawl.

In [6]:
[
    tagged__
    for tagged_ in explode_with(color_exploder)(tagged)
    for tagged__ in explode_with(year_exploder)(tagged_)
]  # same depth-2 crawl as above
Out[6]:
[('s3://fake-bucket/fake-prefix/red/2025',
  {'global_attr': 42, 'color': 'red', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/red/2024',
  {'global_attr': 42, 'color': 'red', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/green/2025',
  {'global_attr': 42, 'color': 'green', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/green/2024',
  {'global_attr': 42, 'color': 'green', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/blue/2025',
  {'global_attr': 42, 'color': 'blue', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/blue/2024',
  {'global_attr': 42, 'color': 'blue', 'year': 2024})]

sparkrawl provides a convenience function to "flatten" a chains of exploders.

In [7]:
crawler = fan_out(
    explode_with(color_exploder),
    explode_with(year_exploder),
)
list(crawler(tagged))
Out[7]:
[('s3://fake-bucket/fake-prefix/red/2025',
  {'global_attr': 42, 'color': 'red', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/red/2024',
  {'global_attr': 42, 'color': 'red', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/green/2025',
  {'global_attr': 42, 'color': 'green', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/green/2024',
  {'global_attr': 42, 'color': 'green', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/blue/2025',
  {'global_attr': 42, 'color': 'blue', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/blue/2024',
  {'global_attr': 42, 'color': 'blue', 'year': 2024})]

Usage in RDD Spark¶

Everything has been done locally so far, but the same crawlers and exploders can be fed to distributed data processing frameworks like Spark.

Here's what the same crawl might look like using RDD Spark with flatMap.

In [8]:
import pysparkling
sc = pysparkling.Context()

rdd = sc.parallelize([tagged])

rdd.flatMap(explode_with(color_exploder)).flatMap(explode_with(year_exploder)).collect()
Out[8]:
[('s3://fake-bucket/fake-prefix/red/2025',
  {'global_attr': 42, 'color': 'red', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/red/2024',
  {'global_attr': 42, 'color': 'red', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/green/2025',
  {'global_attr': 42, 'color': 'green', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/green/2024',
  {'global_attr': 42, 'color': 'green', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/blue/2025',
  {'global_attr': 42, 'color': 'blue', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/blue/2024',
  {'global_attr': 42, 'color': 'blue', 'year': 2024})]

Modular crawlers¶

explode_with is un-opinionated about how you obtain your crawlers. Users are free to implement their own modularity strategies. However, sparkrawl includes a few functional programming (FP) utilities that help build exploders without writing as many custom functions. These utilities are totally optional, but can be helpful in some instances.

Here's the same crawl with different but equivalent exploder definitions, now using minimal custom functions.

In [9]:
# Build branch-level exploders from primitives, without ad hoc function defs.
color_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(with_attribs(color=uri_name)))
year_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(with_attribs(year=pipeline(uri_name, int))))

rdd.flatMap(explode_with(color_exploder)).flatMap(explode_with(year_exploder)).collect()
Out[9]:
[('s3://fake-bucket/fake-prefix/red/2025',
  {'global_attr': 42, 'color': 'red', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/red/2024',
  {'global_attr': 42, 'color': 'red', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/green/2025',
  {'global_attr': 42, 'color': 'green', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/green/2024',
  {'global_attr': 42, 'color': 'green', 'year': 2024}),
 ('s3://fake-bucket/fake-prefix/blue/2025',
  {'global_attr': 42, 'color': 'blue', 'year': 2025}),
 ('s3://fake-bucket/fake-prefix/blue/2024',
  {'global_attr': 42, 'color': 'blue', 'year': 2024})]

We have just introduced pipeline, key_only, for_each, and with_attribs. These are just lightweight functional programming constructs:

  • pipeline(*fn_seq) simply applies each function in a sequence to the output of the last one, starting with the input.
  • key_only returns the key of a key-value pair.
  • for_each applies its function argument to each element of an input collection.
  • with_attribs(attrib_fns) uses a dictionary of functions on its input, to derive a dictionary of attributes to tag it with.

A few other useful functions are in the api docs.

Something we can see in the new modular definitions of the two exploders is that it would be very easy to swap out our fake S3 client, e.g., for a real boto one.

Iterating the data itself¶

So far we have only crawled to depth 2. However, now we have the tools to create a crawler for the whole data itself.

Below is the creation of a full data RDD in Spark.

In [10]:
def parquet_attribs(uri):
    """
    "s3://..../size=small" -> {"size": "small"}
    """
    attrib, value = uri_name(uri).split("=", maxsplit=1)
    return {attrib: value}

parquet_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(compute_value(parquet_attribs)))
partition_exploder = pipeline(key_only, fake_s3.list_objects, for_each(with_attribs()))  # empty attribs

def read_s3_json(uri):
    with tempfile.NamedTemporaryFile() as f:
        fake_s3.get(uri, f.name)
        yield from read_jsonlines(Path(f.name))

record_exploder = pipeline(key_only, read_s3_json, for_each(key_by_none))  # key_by_none tags None with the input dict

record_rdd = (rdd
    .flatMap(explode_with(color_exploder))
    .flatMap(explode_with(year_exploder))
    .flatMap(explode_with(parquet_exploder))
    .flatMap(explode_with(partition_exploder))
    .flatMap(explode_with(record_exploder))
    .map(drop_key)
)
    
record_rdd.take(5)
Out[10]:
[{'global_attr': 42,
  'color': 'red',
  'year': 2025,
  'size': 'small',
  'x': 0.5642365541274641},
 {'global_attr': 42,
  'color': 'red',
  'year': 2025,
  'size': 'small',
  'x': 0.8313229586808745},
 {'global_attr': 42,
  'color': 'red',
  'year': 2025,
  'size': 'small',
  'x': 0.7989672717299621},
 {'global_attr': 42,
  'color': 'red',
  'year': 2025,
  'size': 'small',
  'x': 0.8330906190954932},
 {'global_attr': 42,
  'color': 'red',
  'year': 2025,
  'size': 'small',
  'x': 0.5430766499556332}]

Now each record contains all the fields obtained during the crawl to locate it.

Reconfiguration of modular crawlers¶

A nice thing about the modular approach is that stages can easily be split up or recombined. For example, sometimes we wish to crawl to collect files to process in one phase, then process the data itself in another.

In fact, sometimes with Spark we will collect URIs back to the main node to redistribute them more evenly again to workers:

In [11]:
# Crawl to collect all data files.
file_crawler = fan_out(
    explode_with(color_exploder),
    explode_with(year_exploder),
    explode_with(parquet_exploder),
    explode_with(partition_exploder)
)
files_with_metadata = rdd.flatMap(file_crawler).collect()

def take(coll, n):
    return [
        i for i, _ in zip(coll, range(n))
    ]

print("Some tagged files..")
print(take(files_with_metadata, 5))
print()

# Redistribute data files and extract contents.
tagged_files_rdd = sc.parallelize(files_with_metadata)
record_rdd = tagged_files_rdd.flatMap(explode_with(record_exploder)).map(drop_key)

# Show a random sample.
count = record_rdd.count()
p = 10 / count
print("Some records..")
record_rdd.sample(withReplacement=False, fraction=p).collect()[:5]
Some tagged files..
[('s3://fake-bucket/fake-prefix/red/2025/size=small/3.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/2.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/0.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/1.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=large/3.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'large'})]

Some records..
Out[11]:
[{'global_attr': 42,
  'color': 'red',
  'year': 2024,
  'size': 'small',
  'x': 0.15904196615867394},
 {'global_attr': 42,
  'color': 'red',
  'year': 2024,
  'size': 'small',
  'x': 0.864855951488156},
 {'global_attr': 42,
  'color': 'red',
  'year': 2024,
  'size': 'small',
  'x': 0.7143011033179311},
 {'global_attr': 42,
  'color': 'red',
  'year': 2024,
  'size': 'small',
  'x': 0.7875198451927558},
 {'global_attr': 42,
  'color': 'green',
  'year': 2025,
  'size': 'small',
  'x': 0.4198504961792958}]

With pandas¶

sparkrawl also works on pandas.DataFrames, with the explode_pandas_df function.

Starting with the root uri in a pandas DataFrame, along with any global attributes..

In [12]:
import pandas as pd

uri, attrs = tagged
records = [dict(uri=uri, **attrs)]
pdf = pd.DataFrame.from_records(records)
pdf
Out[12]:
uri global_attr
0 s3://fake-bucket/fake-prefix 42

...we can "explode" the dataframe with the same file_crawler used above.

In [13]:
file_pdf = explode_df(
    pdf,  # The source DF
    "uri",  # The name of the column to use as parent
    pipeline(file_crawler, for_each(inject_key("file_uri")))  # The file crawler above with output transformed into a record iterator
)
file_pdf.head(5)
Out[13]:
global_attr color year size file_uri
0 42 red 2025 small s3://fake-bucket/fake-prefix/red/2025/size=sma...
1 42 red 2025 small s3://fake-bucket/fake-prefix/red/2025/size=sma...
2 42 red 2025 small s3://fake-bucket/fake-prefix/red/2025/size=sma...
3 42 red 2025 small s3://fake-bucket/fake-prefix/red/2025/size=sma...
4 42 red 2025 large s3://fake-bucket/fake-prefix/red/2025/size=lar...

The only new utility here is inject_key, which inserts the key of a tagged element into its attributes dict.

In [14]:
import copy

print("Tagged: ", tagged)
print("Injected: ", inject_key("file_uri")(copy.deepcopy(tagged)))  # `inject_key` modifies the input dict for performance reasons
Tagged:  ('s3://fake-bucket/fake-prefix', {'global_attr': 42})
Injected:  {'global_attr': 42, 'file_uri': 's3://fake-bucket/fake-prefix'}

...and Spark DataFrames too¶

sparkrawl also provides a Spark-enabled version of the same method.

In [15]:
from pysparkling.sql.session import SparkSession
sess = SparkSession(sc)

file_sdf = sess.createDataFrame(file_pdf)

record_sdf = explode_spark_df(
    file_sdf,
    "file_uri",
    pipeline(explode_with(record_exploder), for_each(drop_key))
)
record_sdf.show(10)
+-----------+-----+----+-----+-------------------+
|global_attr|color|year| size|                  x|
+-----------+-----+----+-----+-------------------+
|         42|  red|2025|small| 0.5642365541274641|

|         42|  red|2025|small| 0.8313229586808745|

|         42|  red|2025|small| 0.7989672717299621|

|         42|  red|2025|small| 0.8330906190954932|

|         42|  red|2025|small| 0.5430766499556332|

|         42|  red|2025|small|0.04101010757884138|

|         42|  red|2025|small| 0.1973572306717718|

|         42|  red|2025|small| 0.4933307365903946|

|         42|  red|2025|small| 0.5283189997965133|

|         42|  red|2025|small|0.24376168717000823|
+-----------+-----+----+-----+-------------------+
only showing top 10 rows

Appendix: Functional Utility Examples¶

In [16]:
# AKA https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.compose_left
assert pipeline(range, sum, lambda x: x ** 2)(4) == sum(range(4)) ** 2
In [17]:
tup = 5, {"ignore": "me"}
print(tup, key_only(tup))

# Note, `as_exploder` from above can be equivalently expressed in terms of `key_only`.
as_exploder_ = lambda key_iter_fn: pipeline(key_only, key_iter_fn)
color_exploder_ = as_exploder_(list_prefixes_with_color_metadata)
assert list(color_exploder_(tagged)) == list(color_exploder(tagged))
(5, {'ignore': 'me'}) 5
In [18]:
with_strlen = with_attribs(dict(strlen=lambda string: len(string)))
print(with_strlen("hello"))

# and `for_each`
each_with_strlen = for_each(with_strlen)
print(list(each_with_strlen(["hello", "again!"])))
('hello', {'strlen': 5})
[('hello', {'strlen': 5}), ('again!', {'strlen': 6})]