Experiences of Using Alluxio with Spark

Introduction

Alluxio refers to itself as an "Open Source Memory Speed Virtual Distributed Storage" platform. It sits between the storage and processing framework layers in the distributed computing ecosystem and claims to heavily improve performance when multiple jobs are reading/writing from/to the same data. This post will cover some of the basic features of Alluxio and will compare its performance for accessing data against caching in Spark.

Spark Caching

Caching (and persistence in general) in Spark is intended to be used to checkpoint data which will be frequently accessed in a job lifecycle. However persisted datasets cannot be shared between applications. I will use caching in Spark as a benchmark to compare to the performance of Alluxio in-memory storage.

For my test jobs I will use a 3300MB text file in a HDFS. Alluxio is mounted to the /alluxio directory in this HDFS.

 hdfs dfs -put /root/bigFile /alluxio/bigFile

Now I can test access speeds to a cached copy of this file in the spark shell.

 spark-shell --master yarn --executor-memory 3500MB --num-executors 5 --executor-cores 3

I now have an application running in the spark shell with 21GB of resources; 1GB for the application manager and around 20GB for the executors.

 

I will run a simple filter job 40 times and record the time taken to complete it.

def time[R](block: => R): R = {                                         // Function to time actions.
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0) + "ns")
    result
 }

val file = sc.textFile("hdfs://hostname.cern.ch:8020/alluxio/bigFile")  // Reference file from HDFS.
 file.cache                                                              // Set it to be cached in memory.
 file.filter(line => line.contains("BALTIMORE")).collect                 // Cache file.

time( for ( a <- 1 to 40) {                                             // Run filter 40 times on cached file.
       file.filter(line => line.contains("BALTIMORE")).collect
    }
 )

The time taken to complete the filters on the cached files was 14.7 seconds.

Using Alluxio with Spark

Instead of caching the file in Spark, we will now cache it in Alluxio. We can allocate worker memory to Alluxio from the alluxio_home/conf/alluxio-env.sh config file:

ALLUXIO_WORKER_MEMORY_SIZE=${ALLUXIO_WORKER_MEMORY_SIZE:-"1530MB"}

This change can be made on the master and synced to the workers.

alluxio coypDir conf

alluxio-start.sh all Mount

As the file is already loaded into the HDFS in the directory on which Alluxio is mounted, the file is visible with 0% in memory. I can also see that all the workers are available with the correct allocated resources.

I'll load a version of the file into Alluxio memory using Spark and then perform the same job as before.

val file = sc.textFile("hdfs://hostname.cern.ch:8020/alluxio/bigFile") // Access the file store out of memory.
file.saveAsTextFile("alluxio://hostname.cern.ch:19998/bigFile1")       // Store a copy of the file in memory.
val file1 = sc.textFile("alluxio://hostname.cern.ch:19998/bigFile1")   // Access this in-memory file.
val filteredFile = file1.filter(line => line.contains("BALTIMORE"))    // Run the same filter as before.

time( for ( a <- 1 to 40) {                                             // Run filter 40 times on cached file.
       file.filter(line => line.contains("BALTIMORE")).collect
    }
 )

The time recorded to complete the filter was 47.8 seconds.

Results

A summary of the results obtained above, I also ran the test on the file stored on a HDD HDFS and left the file uncached:

The difference in speeds between Alluxio and Spark caching lies in the overhead required for Spark to connect to the Alluxio filesystem. This effect is very potent in this test as we have run a filter on a small file many times; Spark is connecting the the filesystem many times. If we could compare much larger files in memory then this overhead would not be anywhere near as obvious. Unfortunately the test environment available does not allow this.

Further Alluxio Options

Loading into Alluxio Externally

In the previous example I could have loaded the file into Alluxio outside of Spark using the command line interface.

alluxio fs load /bigFile

The behaviour of this loading method is different in a couple of ways. Firstly given enough space in memory Spark will replicate blocks in Alluxio (from the underFS), which I believe is done to minimise time wasted waiting for executors to finish certain processes. I allocated Alluxio more memory for this option so that there was enough memory for this replication to occur. 

Secondly the file is automatically persisted to the underFS using the CLI due to the write type option specified in Alluxio's configuration files (alluxio-site.properties). Whereas the saveAsTextFile method in Spark produces a file which is only present in memory. This difference in persistence behaviour is due to the fact that using Alluxio in Spark automatically uses Alluxio's default settings unless specified otherwise. The write type default for Alluxio is MUST_CAHE and so files are only written to Alluxio and not to the underFS. The Alluxo CLI behaviour is completely dependent on the Alluxio user's configuration files which had already been changed from the default option to CACHE_THROUGH. It is possible to change Alluxio's behaviour in Spark from the default options by passing in Alluxio configuration options (from alluxio-site.properties) as Java options. Persisted files in Alluxio allow Spark to operate in a fault-tolerant manner.

The job times were essentially the same as the previous method using Alluxio.

 

Accessing Data from a Different Job

One of the major advantages of using Alluxio is that cached data can be accessed by multiple jobs. In the first example, where a file was stored in Alluxio using Spark, I can access this data from another application. Access from a differet framework or job should not add any overhead to processing times.

Conclusions

Main Points

  • Alluxio in memory storage achieves worse performance than Spark Caching but is far better than standard storage methods.
    • The slower performance is due to an overhead when connecting to the Alluxio Filesystem from within Spark.
  • Using Spark to store files in Alluxio memory does not utilise replication.
    • Increasing Alluxio memory allocation does not increase performance in this context.
  • Using Alluxio commands to store files in Alluxio memory can utilise replication (from the underFS).
    • Increasing Alluxio allocation does increase performance in this context but only after replication has been initiated.
  • Alluxio allows in-memory data to be accessed across applications and framework.
    • Spark caching does not allow this.

Add new comment

You are here