Why I’m Loving Spark 4’s Python Data Source (with Direct Arrow Batches)

Submitted by canali on
Blog article:

TL;DR: Apache Spark 4 lets you build first-class data sources in pure Python. If your reader yields Arrow RecordBatch objects, Spark ingests them with reduced Python↔JVM serialization overhead. I used this to ship a ROOT data format reader for PySpark.

A PySpark reader for the ROOT format

ROOT is the de-facto data format across High-Energy Physics; CERN experiments alone store over an exabytes of data in ROOT files. With Spark 4’s Python API, building a ROOT reader becomes a focused Python exercise: parse with Uproot/Awkward, produce a PyArrow Table, yield RecordBatch, and let Spark do the rest.


Yield arrow batches, skip the row-by-row serialization tax

Spark 4 introduces a Python data source that makes it simple to ingest data via Python libraries. The basic version yields Python rows (incurring per-row Python↔JVM hops). With direct Arrow batch support (SPARK-48493) your DataSourceReader.read() can yield pyarrow.RecordBatch objects so Spark ingests columnar data in batches, dramatically reducing cross-boundary overhead. 


ROOT → Arrow → Spark in a few moving parts

Here’s the essence of the ROOT data format reader I implemented and packaged in pyspark-root-datasource:

  1. Subclass the DataSource and DataSourceReader from pyspark.sql.datasource.

  2. In read(partition), produce pyarrow.RecordBatch objects.

  3. Register your source with spark.dataSource.register(...), then spark.read.format("root")....

Minimal, schematic code (pared down for clarity):

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class RootDataSource(DataSource):
    @classmethod
    def name(cls):
        return "root"
    def schema(self):
        # Return a Spark schema (string or StructType) or infer in the reader.
        return "nMuon int, Muon_pt array<float>, Muon_eta array<float>"
    def reader(self, schema):
        return RootReader(schema, self.options)

class RootReader(DataSourceReader):
    def __init__(self, schema, options):
        self.schema = schema
        self.path = options.get("path")
        self.tree = options.get("tree", "Events")
        self.step_size = int(options.get("step_size", 1_000_000))

    def partitions(self):
        # Example: split work into N partitions
        N = 8
        return [InputPartition(i) for i in range(N)]

    def read(self, partition):
        import uproot, awkward as ak
        start = partition.index * self.step_size
        stop  = start + self.step_size
        with uproot.open(self.path) as f:
            arrays = f[self.tree].arrays(entry_start=start, entry_stop=stop, how=ak.Array)
        table = ak.to_arrow_table(arrays, list_to32=True)
        for batch in table.to_batches():   # <-- yield Arrow RecordBatches directly
            yield batch
 


Why this felt easy

  • Tiny surface area: one DataSource, one DataSourceReader, and a read() generator. The Spark docs walk through the exact hooks and options: Apache Spark

  • Python-native stack: I could leverage existing Python libraries for reading the ROOT format and for processing Arrow, Uproot + Awkward + PyArrow, with no additional Scala/Java code.

  • Arrow batches = speed: SPARK-48493 wires Arrow batches into the reader path to avoid per-row conversion. issues.apache.org


Practical tips

  • Provide a schema if you can. It enables early pruning and tighter I/O; the Spark guide covers schema handling and type conversions.

  • Partitioning matters. ROOT TTrees can be large and jagged: tune step_size to balance task parallelism and batch size. (The reader exposes options for this.)

  • Arrow knobs. For very large datasets, controlling Arrow batch sizes can improve downstream pipeline behavior; see Spark’s Arrow notes.


“Show me the full thing”

If you want a working reader with options for local files, directories, globs, and the XRootD protocol (root://), check out the package at: pyspark-root-datasource

Quick start

1. Install

pip install pyspark-root-datasource
 

2. Grab an example ROOT file:

wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/Run2012BC_DoubleMuParked_Muons.root
 
3. Read with PySpark (Python code):
 
from pyspark.sql import SparkSession
from pyspark_root_datasource import register

spark = SparkSession.builder.appName("ROOT via PySpark").getOrCreate()
register(spark)  # registers "root" format

schema = "nMuon int, Muon_pt array<float>, Muon_eta array<float>, Muon_phi array<float>"
df = (spark.read.format("root")
      .schema(schema)
      .option("path", "Run2012BC_DoubleMuParked_Muons.root")
      .option("tree", "Events")
      .option("step_size", "1000000")
      .load())

df.printSchema()
df.show(3, truncate=False)
df.count()


A tiny physics “hello world”

Once the ROOT data is loaded, you can compute the dimuon mass spectrum from CERN Open Data in just a few lines, a quick smoke test and a classic HEP demo. Run the basic analysis here: Notebook link.

The dimuon mass spectrum analysis, a "Hello World!" example for data analysis in High Energy Physics. This image is the output of a notebook and an analysis using CERN Open Data and the pyspark-root-datasource.

 

Beyond HEP: a general recipe

The same approach works for any niche or binary format:

  1. Read bytes with your Python parser.

  2. Build a PyArrow Table.

  3. Yield RecordBatch objects from read().

Spark handles distribution, schema, projection, joins, writes, so “we should write a connector” becomes a focused Python task rather than a long JVM project.


Performance note

Python data sources still cross the Python↔JVM boundary. Direct Arrow batches greatly reduce the serialization tax and improve throughput, and in some tests it showed 10x performance gain over a basic Python Data Source that just sends data row by row. However, if you need absolute peak performance, a native JVM DataSource V2 will usually win. 


Limitations

This is a Python Data Source. Even with direct Arrow RecordBatch support, it won’t match a native Scala/Java V2 connector for raw throughput or advanced pushdown. Projection and predicate pushdown are limited. Schema inference on large TTrees is slow and may widen types, prefer explicit schemas. ROOT quirks apply: unsigned integers must be cast to Spark’s signed types, and deeply nested or jagged arrays may need options like extension arrays to stay practical. Watch memory: oversized Arrow batches or partitions can OOM executors, tune the step_size, and partition counts. Data access with the XRootD protocol works, but requires additional configuration, misconfigured deps (uproot, awkward, pyarrow, fsspec-xrootd) are common failure points. Finally, the data source implementation at this stage is only for read-only (no writer), not streaming, and multi-file datasets with inconsistent schemas may require pre-validation or per-file options.


Pointers & references

 


Acknowledgements

Many thanks to ATLAS colleagues, especially the ADAM (Atlas Data and Metadata) team, and to the CERN Databases and Data Analytics group for their support with Hadoop, Spark, and the SWAN hosted notebook services. We are also grateful to Allison Wang for assistance with SPARK-48493, to Jim Pivarski for guidance on using Uproot and Awkward Array.

Add new comment

CAPTCHA
Enter the characters shown in the image.
This question is for testing whether or not you are a human visitor and to prevent automated spam submissions.

Disclaimer

The views expressed in this blog are those of the authors and cannot be regarded as representing CERN’s official position.

CERN Social Media Guidelines

 

Blogroll