Introduction
In the part 2 of 'Integrating Hadoop and Elasticsearch' blogpost series we look at bridging Apache Spark and Elasticsearch. I assume that you have access to Hadoop and Elasticsearch clusters and you are faced with the challenge of bridging these two distributed systems. As spark code can be written in scala, python and java, we look at the setup, configuration and code snippets across all these three languages both in batch and interactively.
Before reading this blogpost, I recommend you to read the part 1 in this series - Loading into and querying Elasticsearch and Apache Hive
Spark
Apache® Spark™ is a powerful open source processing engine built around speed, ease of use, and sophisticated analytics. Spark is becoming the defacto processing framework for analytics, machine learning and tackling a range of data challenges. Spark has several deployment modes like Standalone, Mesos and YARN, in this post we only focus on Spark on YARN (Hadoop) cluster deployment mode.
ElasticSearch
Elasticsearch together with Logstash for log tailing and Kibana for visualisation is gaining a lot of momentum as it is fairly easy to setup and get started with Elastic and its near real-time search and aggregation capabilities covers lots of use cases in the area of web analytics and monitoring (log & metric analysis).
Bridging Apache Spark and Elasticsearch
Spark lets you write applications in scala, python, java AND can be executed interactively (spark-shell, pyspark) and in batch mode, so we look at the following scenarios, some in detail and some with code snippets which can be elaborated depending on the use cases.
spark-shell | scala | pyspark | python | java | |
read from ES | Y | Y | Y | Y | Y |
write to ES | Y | Y | Y | Y | Y |
Setup and Configuration
Following is the required setup and configuration for pilot and production purposes
Pilot
If you are getting started with testing ES-Hadoop connector then the following setup is suitable
- Download and Copy Elastic-Hadoop connector to /tmp on HDFS
wget -P /tmp http://download.elastic.co/hadoop/elasticsearch-hadoop-2.3.2.zip unzip /tmp/elasticsearch-hadoop-2.3.2.zip -d /tmp cp /tmp/elasticsearch-hadoop-2.3.2/dist/elasticsearch-hadoop-2.3.2.jar /tmp/elasticsearch-hadoop-2.3.2.jar hdfs dfs -copyFromLocal /tmp/elasticsearch-hadoop-2.3.2/dist/elasticsearch-hadoop-2.3.2.jar /tmp
Production
For the production purpose you can copy the jar file into the SPARK classpath
Configuration
For most cases the following configuration is sufficient
es.nodes # list of elasticsearch nodes, defaults to localhost es.port # elasticsearch port, defaults to 9200 es.resource # es index, e.g; filebeat-2016.05.17/log es.query # es query, defaults to match_all
And most importantly if you ES cluster allows access only through client nodes (which is recommended) then the following parameter is necessary
es.nodes.client.only # routes all requests through the client nodes
The full list of ES-hadoop connector configuration can be found here
All the following code snippets are tested and valid for CDH 5.5 and Spark 1.5.1
spark-shell
reading from ES index
The following code reads elasticsearch index and creates spark RDD, by default the documents are returned as a Tuple2 with document id as first element and the actual document as second element
spark-shell --jars /tmp/elasticsearch-hadoop-2.3.2.jar \ --conf spark.es.nodes="ela-n1" \ // import elasticsearch packages import org.elasticsearch.spark._ // load elasticsearch index into spark rdd val fbeat_rdd = sc.esRDD("fmem/log") // print 10 records of the rdd fbeat_rdd.take(10).foreach(println) // count the records in the rdd fbeat_rdd.count // map it to different tuple, with each tuple containing list of values only val fmem_t_rdd = fbeat_rdd.map{ case(id, doc) => (doc.get("dt").get, doc.get("server").get.asInstanceOf[String], doc.get("mem").get.asInstanceOf[Long]) } // display the maximum memory consumed on each server mapped.map{case (x,y,z) => (y,z)}.reduceByKey((a,b) => math.max(a,b)).collect().foreach(println)
Alternatively, you can do the same thing in a much more efficient way using spark dataframe and spark SQL
spark-shell --jars /tmp/elasticsearch-hadoop-2.3.2.jar \ --conf spark.es.nodes="ela-n1" // load the elasticsearch index into spark dataframe val fbeat_df = sqlContext.read.format("org.elasticsearch.spark.sql").load("fmem/log") // inspect the data fbeat_df.show(10) // display the maximum memory consumed on each server fbeat_df.groupBy("server").max().show()
writing to ES index
In the following code snippet, we read the csv (containing the memory consumption of flume agent on our servers) into rdd, map it to the schema and persist the rdd as elasticsearch index. As mentioned earlier use es.nodes.client.only=true if your elasticsearch cluster is configured to route traffic through client nodes
spark-shell --jars /tmp/elasticsearch-hadoop-2.3.2.jar \ --conf spark.es.nodes="ela-n1" \ --conf spark.es.nodes.client.only=true //import elasticsearch packages import org.elasticsearch.spark._ //define the schema case class MemT(dt: String, server: String, memoryused: Integer) //load the csv file into rdd val Memcsv = sc.textFile("/tmp/flume_memusage.csv") //split the fields, trim and map it to the schema val MemTrdd = Memcsv.map(line=>line.split(",")).map(line=>MemT(line(0).trim.toString,line(1).trim.toString,line(2).trim.toInt)) //write the rdd to the elasticsearch MemTrdd.saveToEs("fmem/logs")
scala
In the previous examples, we have been running our code from the interpreter, lets now put it in a file and run it with spark-submit. This process is more involved than it appears, is also a generic way of submitting spark applications.
writing to ES index
In the following code snippet we calculate the logging rate of our sensors on an hourly basis and persist this into elasticsearch index
Step 1) create the following directory structure
src |-main |---scala
Step 2) copy the following code to src/main/scala/
/* Write to ES from Spark(scala) */ import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ import org.apache.spark.SparkConf object WriteToES { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WriteToES") conf.set("es.index.auto.create", "true") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val sen_p=sqlContext.read.parquet("/user/hloader/SENSOR.db/eventhistory_00000001/day=2016-04-20/34de184c2921296-67bec2fd236fec8e_1294729350_data.0.parq") sen_p.registerTempTable("sensor_ptable") sqlContext.sql("SELECT cast(date_format(ts,'YYYY-MM-dd H') as timestamp) as ts, element_id, count(*) as cnt FROM sensor_ptable group by cast(date_format(ts,'YYYY-MM-dd H') as timestamp),element_id").saveToEs("sensor/metrics") } }
Step 3) Copy the following to SparkToES.sbt
name := "Write to ES" version := "1.0" scalaVersion := "2.10.5" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" libraryDependencies += "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.2"
Step 4) Package it with sbt (scala build tool)
sbt package
Step 5) Submit the spark application with spark-submit
spark-submit \ --jars /tmp/elasticsearch-hadoop-2.3.2.jar \ --class "WriteToES" \ --conf spark.es.nodes="ela-n1" \ --conf spark.es.nodes.client.only=true \ --master yarn-cluster \ target/scala-2.10/write-to-es_2.10-1.0.jar
reading from ES index
The following code allows you to read from elasticsearch index, again you need to follow the steps mentioned above to package it and submit to yarn.
/*read ES index from Spark(scala) */ import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ import org.apache.spark.SparkConf object ReadFromES { def main(args: Array[String]) { val conf = new SparkConf().setAppName("ReadFromES") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val es_df=sqlContext.read.format("org.elasticsearch.spark.sql").load("filebeat-2016.05.19/log") println(es_df.count()) } } sbt package spark-submit \ --jars /tmp/elasticsearch-hadoop-2.3.2.jar \ --class "ReadFromES" \ --conf spark.es.nodes="ela-n1" \ --conf spark.es.nodes.client.only=true \ --master yarn-cluster \ target/scala-2.10/write-to-es_2.10-1.0.jar
pyspark and python
reading from ES index (pyspark)
pyspark is the python bindings for the Spark platform, since presumably data scientists already know python this makes it easy for them to write code for distributed computing. The following code snippet shows you how to read elasticsearch index from python
pyspark --jars /tmp/elasticsearch-hadoop-2.3.2.jar conf = {"es.resource" : "filebeat-2016.05.19/log", "es.nodes" : "ela-n1", "es.query" : "?q=*", "es.nodes.client.only" : "true"} rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first()
starting from the spark 1.4 you can also read elasticsearch index in more straightforward way using sqlcontext and dataframe
pyspark --jars /tmp/elasticsearch-hadoop-2.3.2.jar --conf spark.es.resource="filebeat-2016.05.19/log" --conf spark.es.nodes="ela-n1" --conf spark.es.nodes.client.only=true # read in ES index/type "filebeat-2016.05.19/log" es_df = sqlContext.read.format("org.elasticsearch.spark.sql").load("filebeat-2016.05.19/log") es_df.printSchema() es_df.show(10)
writing to ES index (python with spark-submit)
In the following example you can see how the python spark application can be run using spark-submit
Step 1) copy the following code to es_spark_write.py
# es_spark_write.py from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext if __name__ == "__main__": conf = SparkConf().setAppName("WriteToES") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) es_conf = {"es.nodes" : "ela-n1","es.port" : "9200","es.nodes.client.only" : "true","es.resource" : "sensor_counts/metrics"} es_df_p = sqlContext.read.parquet("/user/hloader/SENSOR.db/eventhistory_00000001/day=2016-04-20/34de184c2921296-67bec2fd236fec8e_1294729350_data.0.parq") es_df_pf= es_df_p.groupBy("element_id").count().map(lambda (a,b): ('id',{'element_id': a,'count': b})) es_df_pf.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf)
Step 2) submit the python application using spark-submit
spark-submit --master yarn-cluster --jars /tmp/elasticsearch-hadoop-2.3.2.jar es_spark_write.py
Conclusion
This blog post goes into the details of how to query Elasticsearch from spark interactively and in batch using scala, python. Most importantly also shows how to write to Elasticsearch index as this opens the possibilities of performing distributed batch analytics on large scale using spark and visualizing the results using Kibana and Elasticsearch. You can refer to elastic-hadoop connector documentation for further options and possibilities.