Diving into Spark and Parquet Workloads, by Example

Topic: In this post you can find a few simple examples illustrating important features of Spark when reading partitioned tables stored in Parquet, in particular with a focus on performance investigations. The main topics covered are:

  • Partition pruning
  • Column projection
  • Predicate/filter push-down
  • Tools for investigating Parquet metadata
  • Tools for measuring Spark metrics

 

Motivations: The combination of Spark and Parquet currently is a very popular foundation for building scalable analytics platforms. At least this is what we find in several projects at the CERN Hadoop and Spark service. In particular performance, scalability and ease of use are key elements of this solution that make it very appealing to our users. This also motivates the work described in this post on exploring the technology and in particular the performance characteristics of Spark workloads and internals of Parquet to better understand what happens under the hood, what works well and what are some of the current limitations.

 

The lab environment

This post is based on examples and short pieces of code that you should be able to try out in your test environments and experiment on why something works (or why it doesn't). I have used in this post a table from the schema of the TPCDS benchmark. The setup instructions for TPCDS on Spark can be found on Github at: "Spark SQL Performance Tests".

 

When testing the examples and measuring performance for this post, I have mostly used Spark on a YARN/Hadoop cluster of 12 nodes, however this is not a hard dependency: you can run your tests with similar results using local filesystem and/or Spark in local mode. I have run most of the examples using spark-shell, however the examples use Spark SQL so in most cases they will run unchanged on PySpark and/or on a notebook environment.

The labs discussed in this post have been tested using Spark version 2.2.0 (Release Candidate) and on Spark 2.1.1. Spark 2.2.0 depends on Parquet libraries version 1.8.2 (Spark 2.1.1 uses Parquet 1.8.1).

 

Test table in Parquet

The examples presented here use the TPCDS schema created with scale 1500 GB. Only one table is used in the examples, I have chosen to use the largest fact table: STORE_SALES. The table is partitioned and after the schema installation is physically located as a collection of Parquet files organized under a root directory.

The total size is 185 GB in my lab environment. It adds up to 556 GB considering the 3-fold HDFS replication factor. This can be measured with:

 

$ hdfs dfs -du -h -s TPCDS/tpcds_1500/store_sales

185.3 G  556.0 G  TPCDS/tpcds_1500/store_sales

 

The partitioning structure is visible by listing the content of the top-level directory. There are 1824 partitions in the test schema I used. Each partition is organized in a separate folder, the folder name contains the partition key. The files are compressed with snappy. The size of each individual file varies depending on the amount of data in the given partition. Here is an example of path and size of one of the files that constitute the store_sale table in my test schema:

 

Name:  TPCDS/tpcds_1500/store_sales/ss_sold_date_sk=2452621/part-00077-57653b27-17f1-4069-85f2-7d7adf7ab7df.snappy.parquet

Bytes: 208004821

  

Spark and Parquet

Partitioning is a feature of many databases and data processing frameworks and it is key to make Spark jobs work at scale. Spark deals in a straightforward manner with partitioned tables in Parquet. The STORES_SALES from the TPCDS schema described in the previous paragraph is an example of how partitioning is implemented on a filesystem (HDFS in that case). Spark can read tables stored in Parquet and performs partition discovery with a straightforward API. This is an example of how to read the STORE_SALES table into a Spark DataFrame

 

val df = spark.read.

              format("parquet").

              load("TPCDS/tpcds_1500/store_sales")

 

This is an example of how to write a Spark DataFrame df into Parquet files preserving the partitioning (following the example of table STORE_SALES the partitioning key is ss_sold_date_sk): 

 

df.write.

  partitionBy("ss_sold_date_sk").

  parquet("TPCDS/tpcds_1500/store_sales_copy")

   

Partition pruning

Let's start the exploration with something simple: partition pruning. This feature, common to most systems implementing partitioning, can speed up your workloads considerably by reducing the amount of I/O needed to process your query/data access code. The underlying idea behind partition pruning, at least in its simplest form for single-table access as in the example discussed here, is to read data only from a list of partitions, based on a filter on the partitioning key, skipping the rest. An example to illustrate the concept is query (2) (see below if you want to peek ahead). Before getting to that, however, I want to introduce a "baseline workload" with the goal of having a reference point for comparing the results of all the optimizations that we are going to test in this post.

 

1. Baseline workload: full scan of the table, no partition pruning

 

I have used spark-shell on a YARN cluster in my tests. You can also reproduce this in local mode (i.e. using --master local[*]) and/or use pyspark or a notebook to run the tests if you prefer. An example of how to start spark-shell (customize as relevant for your environment) is:

 

$ spark-shell --num-executors 12 --executor-cores 4 --executor-memory 4g 

 

The next step is to use the Spark Dataframe API to lazily read the files from Parquet and register the resulting DataFrame as a temporary view in Spark.

 

spark.read.format("parquet").load("TPCDS/tpcds_1500/store_sales").createOrReplaceTempView("store_sales")

 

This Spark SQL command causes the full scan of all partitions of the table store_sales and we are going to use it as a "baseline workload" for the purposes of this post.

 

// query (1), this is a full scan of the table store_sales

spark.sql("select * from store_sales where ss_sales_price=-1.0").collect()

 

The query reads about 185 GB of data and 4.3 billion rows in my tests. You can find this type of performance metrics from the Spark Web UI (look for stage metrics: Input Size / Records), or with the tool sparkMeasure discussed later in this post. Here are the key metrics measured on a test of query (1):

 

  • Total Time Across All Tasks: 59 min
  • Locality Level Summary: Node local: 1675
  • Input Size / Records: 185.3 GB / 4319943621
  • Duration: 1.3 min            

The execution plan of the query shows how Spark executes the query: it distributes to the executors reading of all partitions/files on HDFS and then filtering for "ss_sales_price = -1", finally it collects the result set into the driver. Note the execution plan can be found in the Web UI or using the explain method on the Spark DataFrame.

 

 

 

Note: at this stage, you can ignore the where clause "ss_sales_price = -1.0" in the test query. It is there so that an empty result set is returned by the query (no sales price is negative), rather than returning 185 GB of data and flooding the driver! What happens when executing query (1) is that Spark has to scan all the table partitions (files) from Parquet before applying the filter "ss_sales_price = -1.0" so this example is still a valid illustration of the baseline workload of table full scan. Later in this post, you will find more details about how and why this works in the discussion on predicate push down.

   

2. Query with a filter on the partitioning key and using partition pruning

 

This is an example of a query where Spark SQL can use partition pruning. The query is similar to the baseline query (1) but with the notable change of an additional filter on the partition key. The query can be executed by reading only one partition of the STORE_SALES table.

 

// query (2), example of partition pruning

spark.sql("select * from store_sales where ss_sold_date_sk=2452621 and ss_sales_price=-1.0").collect()

 

The execution of query (2) is much faster than the full scan case of query (1). Obviously, the text of the query and the filters are different, but main point I want to stress is that only one partition needs to be read in the case of query (2) and this makes a huge difference. Web UI metrics show that query (2) needs to read only 198.6 MB of data (4502609 records).

 

  • Total Time Across All Tasks: 6 s
  • Locality Level Summary: Node local: 48
  • Input Size / Records: 198.6 MB / 4502609 
  • Duration: 3 s                        

The execution plan confirms that the filter on the partitioning key "ss_sold_date_sk=2452621" has been "pushed down" to Parquet reader which can use the information to just read the file containing data for that partition. Note: as in the previous example, the additional filter "ss_sales_price = -1" is there to return an empty set as opposed to fill the driver with the result set, as previously commented this can be ignored for the purposes of comparing full scan vs. partition pruning, as the filter is only evaluated after Spark has read the partition.