sparkrawl
Demo¶
This notebook demonstrates the main usage patterns that sparkrawl
is designed for.
from sparkrawl import *
# Utilities for test/demo only; not packaged.
from ospath import *
from fileio import *
from testcases import *
Ad hoc Data Layouts¶
sparkrawl
is motivated by crawling custom data layouts, e.g., for early-stage data projects. So for this demo we will simulate data retrieval from S3 object storage.
We start by initializing a fake data store, and populating it with example data, organized according to an ad hoc path embedding of example attributes: color, year, size.
# Initialize fake S3.
from pathlib import Path
import tempfile
tmpdir = tempfile.TemporaryDirectory()
tmp_path = Path(tmpdir.name)
from fakes3 import *
fake_s3 = FakeS3(tmp_path / "fake-s3")
from fileio import write_jsonlines
def write_s3_json(data, uri: str):
with tempfile.NamedTemporaryFile() as f:
fpath = Path(f.name)
write_jsonlines(data, fpath)
fake_s3.put(fpath, uri)
# Populate with example data.
data_root_uri = "s3://fake-bucket/fake-prefix"
prefix = Uri.from_uri(data_root_uri)
COLORS = ["green", "red", "blue"]
YEARS = [2024, 2025]
SIZES = ["large", "small"]
import random
for c in COLORS:
for y in YEARS:
for sz in SIZES:
for k in range(4):
data = [{"x": random.random()} for _ in range(10)]
uri_ = prefix / c / str(y) / f"size={sz}" / f"{k}.jsonl"
uri = str(uri_)
write_s3_json(data, uri)
def list_all(uri):
return [
*(uri_ for uri_ in fake_s3.list_objects(uri)),
*(
uri__
for uri_ in fake_s3.list_prefixes(uri)
for uri__ in list_all(uri_)
),
]
print("Example data..")
print(data)
print()
print("First few partitions..")
list_all(data_root_uri)[:10]
Example data.. [{'x': 0.42094823536397274}, {'x': 0.8173876154560311}, {'x': 0.9524356175681418}, {'x': 0.5183503776883939}, {'x': 0.291400519606167}, {'x': 0.2707331207445187}, {'x': 0.800316351377395}, {'x': 0.052817573828492614}, {'x': 0.8289190419259124}, {'x': 0.14086218239897608}] First few partitions..
['s3://fake-bucket/fake-prefix/red/2025/size=small/3.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=small/2.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=small/0.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=small/1.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=large/3.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=large/2.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=large/0.jsonl', 's3://fake-bucket/fake-prefix/red/2025/size=large/1.jsonl', 's3://fake-bucket/fake-prefix/red/2024/size=small/3.jsonl', 's3://fake-bucket/fake-prefix/red/2024/size=small/2.jsonl']
Note that the individual files only contain measurements of a scalar x
. We need to parse the object URIs to glean the additional contextual data.
Branching with embedded attributes¶
First, let's look at what we might do without using sparkrawl
.
We see that the first branch from the root prefix encodes the color attribute. As in our example, this field may not actually appear in the records themselves. There are various reasons it may be omitted, but a common one is to avoid storing a lot of redundant text.
For data processing purposes, however, we typical want those fields to be present in the records.
So we can define a branch iterator that yields each sub-prefix with associated metadata.
In general, a branch could embed a number of attributes, so we tag with dict
s.
def list_prefixes_with_color_metadata(uri):
"""This method knows how to interpet the directory name at this level as metadata."""
return [
(prefix, dict(color=uri_name(prefix)))
for prefix in fake_s3.list_prefixes(uri)
]
def uri_name(uri: str):
return Uri.from_uri(uri).key_path.name
list_prefixes_with_color_metadata(data_root_uri)
[('s3://fake-bucket/fake-prefix/red', {'color': 'red'}), ('s3://fake-bucket/fake-prefix/green', {'color': 'green'}), ('s3://fake-bucket/fake-prefix/blue', {'color': 'blue'})]
Now we can easily query the metadata associated with any prefix to obtain the color value of all records under it.
Diving deeper¶
When we crawl deeper, we will want to collect all the metadata along each path from the root.
Let's try collecting all prefixes at depth 2.
def list_prefixes_with_year_metadata(uri):
return [
(prefix, dict(year=int(uri_name(prefix))))
for prefix in fake_s3.list_prefixes(uri)
]
def list_prefixes_with_color_and_year_metadata(tagged):
prefix, base_attrs = tagged
return [
(year_prefix, {**base_attrs, **color_attrs, **year_attrs})
for color_prefix, color_attrs in list_prefixes_with_color_metadata(prefix)
for year_prefix, year_attrs in list_prefixes_with_year_metadata(color_prefix)
]
tagged = data_root_uri, {"global_attr": 42}
list_prefixes_with_color_and_year_metadata(tagged)
[('s3://fake-bucket/fake-prefix/red/2025', {'global_attr': 42, 'color': 'red', 'year': 2025}), ('s3://fake-bucket/fake-prefix/red/2024', {'global_attr': 42, 'color': 'red', 'year': 2024}), ('s3://fake-bucket/fake-prefix/green/2025', {'global_attr': 42, 'color': 'green', 'year': 2025}), ('s3://fake-bucket/fake-prefix/green/2024', {'global_attr': 42, 'color': 'green', 'year': 2024}), ('s3://fake-bucket/fake-prefix/blue/2025', {'global_attr': 42, 'color': 'blue', 'year': 2025}), ('s3://fake-bucket/fake-prefix/blue/2024', {'global_attr': 42, 'color': 'blue', 'year': 2024})]
Enter sparkrawl
, AKA Does it scale?¶
The straightforward approach can get out of hand quickly with even just a few levels of nesting. Writing or updating crawlers this way can be time consuming and error-prone.
This is where sparkrawl
comes in.
sparkrawl
lets us build crawlers out of modular components.
The central concept of sparkrawl
is an "exploder".
An exploder is a tagged branch iterator just like those we've defined above, but it takes a tagged parent element as input, rather than the parent element alone. sparkrawl
focuses on this particular function signature only because it is the most general input-output format.
We only have to make a small adjustment to the iterator functions above to have them in "exploder" form.
def as_exploder(key_iter_fn):
def exploder(tagged):
key, _attrs = tagged
return key_iter_fn(key)
return exploder
color_exploder = as_exploder(list_prefixes_with_color_metadata)
year_exploder = as_exploder(list_prefixes_with_year_metadata)
Time to "explode"¶
The central utility of sparkrawl
is the explode_with
function, which decorates exploders to handle the merging of parent attributes into child ones during a crawl.
[
tagged__
for tagged_ in explode_with(color_exploder)(tagged)
for tagged__ in explode_with(year_exploder)(tagged_)
] # same depth-2 crawl as above
[('s3://fake-bucket/fake-prefix/red/2025', {'global_attr': 42, 'color': 'red', 'year': 2025}), ('s3://fake-bucket/fake-prefix/red/2024', {'global_attr': 42, 'color': 'red', 'year': 2024}), ('s3://fake-bucket/fake-prefix/green/2025', {'global_attr': 42, 'color': 'green', 'year': 2025}), ('s3://fake-bucket/fake-prefix/green/2024', {'global_attr': 42, 'color': 'green', 'year': 2024}), ('s3://fake-bucket/fake-prefix/blue/2025', {'global_attr': 42, 'color': 'blue', 'year': 2025}), ('s3://fake-bucket/fake-prefix/blue/2024', {'global_attr': 42, 'color': 'blue', 'year': 2024})]
sparkrawl
provides a convenience function to "flatten" a chains of exploders.
crawler = fan_out(
explode_with(color_exploder),
explode_with(year_exploder),
)
list(crawler(tagged))
[('s3://fake-bucket/fake-prefix/red/2025', {'global_attr': 42, 'color': 'red', 'year': 2025}), ('s3://fake-bucket/fake-prefix/red/2024', {'global_attr': 42, 'color': 'red', 'year': 2024}), ('s3://fake-bucket/fake-prefix/green/2025', {'global_attr': 42, 'color': 'green', 'year': 2025}), ('s3://fake-bucket/fake-prefix/green/2024', {'global_attr': 42, 'color': 'green', 'year': 2024}), ('s3://fake-bucket/fake-prefix/blue/2025', {'global_attr': 42, 'color': 'blue', 'year': 2025}), ('s3://fake-bucket/fake-prefix/blue/2024', {'global_attr': 42, 'color': 'blue', 'year': 2024})]
Usage in RDD Spark¶
Everything has been done locally so far, but the same crawlers and exploders can be fed to distributed data processing frameworks like Spark.
Here's what the same crawl might look like using RDD Spark with flatMap
.
import pysparkling
sc = pysparkling.Context()
rdd = sc.parallelize([tagged])
rdd.flatMap(explode_with(color_exploder)).flatMap(explode_with(year_exploder)).collect()
[('s3://fake-bucket/fake-prefix/red/2025', {'global_attr': 42, 'color': 'red', 'year': 2025}), ('s3://fake-bucket/fake-prefix/red/2024', {'global_attr': 42, 'color': 'red', 'year': 2024}), ('s3://fake-bucket/fake-prefix/green/2025', {'global_attr': 42, 'color': 'green', 'year': 2025}), ('s3://fake-bucket/fake-prefix/green/2024', {'global_attr': 42, 'color': 'green', 'year': 2024}), ('s3://fake-bucket/fake-prefix/blue/2025', {'global_attr': 42, 'color': 'blue', 'year': 2025}), ('s3://fake-bucket/fake-prefix/blue/2024', {'global_attr': 42, 'color': 'blue', 'year': 2024})]
Modular crawlers¶
explode_with
is un-opinionated about how you obtain your crawlers.
Users are free to implement their own modularity strategies.
However, sparkrawl
includes a few functional programming (FP) utilities that help build exploders without writing as many custom functions.
These utilities are totally optional, but can be helpful in some instances.
Here's the same crawl with different but equivalent exploder definitions, now using minimal custom functions.
# Build branch-level exploders from primitives, without ad hoc function defs.
color_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(with_attribs(color=uri_name)))
year_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(with_attribs(year=pipeline(uri_name, int))))
rdd.flatMap(explode_with(color_exploder)).flatMap(explode_with(year_exploder)).collect()
[('s3://fake-bucket/fake-prefix/red/2025', {'global_attr': 42, 'color': 'red', 'year': 2025}), ('s3://fake-bucket/fake-prefix/red/2024', {'global_attr': 42, 'color': 'red', 'year': 2024}), ('s3://fake-bucket/fake-prefix/green/2025', {'global_attr': 42, 'color': 'green', 'year': 2025}), ('s3://fake-bucket/fake-prefix/green/2024', {'global_attr': 42, 'color': 'green', 'year': 2024}), ('s3://fake-bucket/fake-prefix/blue/2025', {'global_attr': 42, 'color': 'blue', 'year': 2025}), ('s3://fake-bucket/fake-prefix/blue/2024', {'global_attr': 42, 'color': 'blue', 'year': 2024})]
We have just introduced pipeline
, key_only
, for_each
, and with_attribs
. These are just lightweight functional programming constructs:
pipeline(*fn_seq)
simply applies each function in a sequence to the output of the last one, starting with the input.key_only
returns the key of a key-value pair.for_each
applies its function argument to each element of an input collection.with_attribs(attrib_fns)
uses a dictionary of functions on its input, to derive a dictionary of attributes to tag it with.
A few other useful functions are in the api docs.
Something we can see in the new modular definitions of the two exploders is that it would be very easy to swap out our fake S3 client, e.g., for a real boto
one.
Iterating the data itself¶
So far we have only crawled to depth 2. However, now we have the tools to create a crawler for the whole data itself.
Below is the creation of a full data RDD in Spark.
def parquet_attribs(uri):
"""
"s3://..../size=small" -> {"size": "small"}
"""
attrib, value = uri_name(uri).split("=", maxsplit=1)
return {attrib: value}
parquet_exploder = pipeline(key_only, fake_s3.list_prefixes, for_each(compute_value(parquet_attribs)))
partition_exploder = pipeline(key_only, fake_s3.list_objects, for_each(with_attribs())) # empty attribs
def read_s3_json(uri):
with tempfile.NamedTemporaryFile() as f:
fake_s3.get(uri, f.name)
yield from read_jsonlines(Path(f.name))
record_exploder = pipeline(key_only, read_s3_json, for_each(key_by_none)) # key_by_none tags None with the input dict
record_rdd = (rdd
.flatMap(explode_with(color_exploder))
.flatMap(explode_with(year_exploder))
.flatMap(explode_with(parquet_exploder))
.flatMap(explode_with(partition_exploder))
.flatMap(explode_with(record_exploder))
.map(drop_key)
)
record_rdd.take(5)
[{'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small', 'x': 0.5642365541274641}, {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small', 'x': 0.8313229586808745}, {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small', 'x': 0.7989672717299621}, {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small', 'x': 0.8330906190954932}, {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small', 'x': 0.5430766499556332}]
Now each record contains all the fields obtained during the crawl to locate it.
Reconfiguration of modular crawlers¶
A nice thing about the modular approach is that stages can easily be split up or recombined. For example, sometimes we wish to crawl to collect files to process in one phase, then process the data itself in another.
In fact, sometimes with Spark we will collect URIs back to the main node to redistribute them more evenly again to workers:
# Crawl to collect all data files.
file_crawler = fan_out(
explode_with(color_exploder),
explode_with(year_exploder),
explode_with(parquet_exploder),
explode_with(partition_exploder)
)
files_with_metadata = rdd.flatMap(file_crawler).collect()
def take(coll, n):
return [
i for i, _ in zip(coll, range(n))
]
print("Some tagged files..")
print(take(files_with_metadata, 5))
print()
# Redistribute data files and extract contents.
tagged_files_rdd = sc.parallelize(files_with_metadata)
record_rdd = tagged_files_rdd.flatMap(explode_with(record_exploder)).map(drop_key)
# Show a random sample.
count = record_rdd.count()
p = 10 / count
print("Some records..")
record_rdd.sample(withReplacement=False, fraction=p).collect()[:5]
Some tagged files.. [('s3://fake-bucket/fake-prefix/red/2025/size=small/3.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/2.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/0.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=small/1.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'small'}), ('s3://fake-bucket/fake-prefix/red/2025/size=large/3.jsonl', {'global_attr': 42, 'color': 'red', 'year': 2025, 'size': 'large'})] Some records..
[{'global_attr': 42, 'color': 'red', 'year': 2024, 'size': 'small', 'x': 0.15904196615867394}, {'global_attr': 42, 'color': 'red', 'year': 2024, 'size': 'small', 'x': 0.864855951488156}, {'global_attr': 42, 'color': 'red', 'year': 2024, 'size': 'small', 'x': 0.7143011033179311}, {'global_attr': 42, 'color': 'red', 'year': 2024, 'size': 'small', 'x': 0.7875198451927558}, {'global_attr': 42, 'color': 'green', 'year': 2025, 'size': 'small', 'x': 0.4198504961792958}]
With pandas
¶
sparkrawl
also works on pandas.DataFrame
s, with the explode_pandas_df
function.
Starting with the root uri in a pandas DataFrame
, along with any global attributes..
import pandas as pd
uri, attrs = tagged
records = [dict(uri=uri, **attrs)]
pdf = pd.DataFrame.from_records(records)
pdf
uri | global_attr | |
---|---|---|
0 | s3://fake-bucket/fake-prefix | 42 |
...we can "explode" the dataframe with the same file_crawler
used above.
file_pdf = explode_df(
pdf, # The source DF
"uri", # The name of the column to use as parent
pipeline(file_crawler, for_each(inject_key("file_uri"))) # The file crawler above with output transformed into a record iterator
)
file_pdf.head(5)
global_attr | color | year | size | file_uri | |
---|---|---|---|---|---|
0 | 42 | red | 2025 | small | s3://fake-bucket/fake-prefix/red/2025/size=sma... |
1 | 42 | red | 2025 | small | s3://fake-bucket/fake-prefix/red/2025/size=sma... |
2 | 42 | red | 2025 | small | s3://fake-bucket/fake-prefix/red/2025/size=sma... |
3 | 42 | red | 2025 | small | s3://fake-bucket/fake-prefix/red/2025/size=sma... |
4 | 42 | red | 2025 | large | s3://fake-bucket/fake-prefix/red/2025/size=lar... |
The only new utility here is inject_key
, which inserts the key of a tagged element into its attributes dict.
import copy
print("Tagged: ", tagged)
print("Injected: ", inject_key("file_uri")(copy.deepcopy(tagged))) # `inject_key` modifies the input dict for performance reasons
Tagged: ('s3://fake-bucket/fake-prefix', {'global_attr': 42}) Injected: {'global_attr': 42, 'file_uri': 's3://fake-bucket/fake-prefix'}
...and Spark DataFrames too¶
sparkrawl
also provides a Spark-enabled version of the same method.
from pysparkling.sql.session import SparkSession
sess = SparkSession(sc)
file_sdf = sess.createDataFrame(file_pdf)
record_sdf = explode_spark_df(
file_sdf,
"file_uri",
pipeline(explode_with(record_exploder), for_each(drop_key))
)
record_sdf.show(10)
+-----------+-----+----+-----+-------------------+ |global_attr|color|year| size| x| +-----------+-----+----+-----+-------------------+ | 42| red|2025|small| 0.5642365541274641| | 42| red|2025|small| 0.8313229586808745| | 42| red|2025|small| 0.7989672717299621| | 42| red|2025|small| 0.8330906190954932| | 42| red|2025|small| 0.5430766499556332| | 42| red|2025|small|0.04101010757884138| | 42| red|2025|small| 0.1973572306717718| | 42| red|2025|small| 0.4933307365903946| | 42| red|2025|small| 0.5283189997965133| | 42| red|2025|small|0.24376168717000823| +-----------+-----+----+-----+-------------------+ only showing top 10 rows
Appendix: Functional Utility Examples¶
# AKA https://toolz.readthedocs.io/en/latest/api.html#toolz.functoolz.compose_left
assert pipeline(range, sum, lambda x: x ** 2)(4) == sum(range(4)) ** 2
tup = 5, {"ignore": "me"}
print(tup, key_only(tup))
# Note, `as_exploder` from above can be equivalently expressed in terms of `key_only`.
as_exploder_ = lambda key_iter_fn: pipeline(key_only, key_iter_fn)
color_exploder_ = as_exploder_(list_prefixes_with_color_metadata)
assert list(color_exploder_(tagged)) == list(color_exploder(tagged))
(5, {'ignore': 'me'}) 5
with_strlen = with_attribs(dict(strlen=lambda string: len(string)))
print(with_strlen("hello"))
# and `for_each`
each_with_strlen = for_each(with_strlen)
print(list(each_with_strlen(["hello", "again!"])))
('hello', {'strlen': 5}) [('hello', {'strlen': 5}), ('again!', {'strlen': 6})]