Topic: This post is about profiling and performance tuning of distributed workloads and in particular Hadoop applications. You will learn of a profiler application we have developed and how it has successfully been applied to tuning Sqoop to improve the throughput of data transfer from Oracle to Hadoop.
Where is my Sqoop job spending CPU time?
One of the data feeds into our Hadoop service is from Oracle databases. For this we make use of Sqoop. In particular we use Sqoop to export Oracle data into Parquet on HDFS for later analysis with Impala and other tools. However, we observed suboptimal performance when running Sqoop jobs: each job was delivering low throughput, of the order of 2 MB/second. In order to increase the total throughput, we have to increase the numbers of mappers. This is not an ideal solution and has several unwanted consequences both on the source and destination systems. One of them is that since we are increasing the number of mappers, we are also increasing the number of utilized cores on the Hadoop cluster, impairing CPU usage for other applications. Initial investigations did not show any problems on the database nor on the HDFS side to light. As a result, we started investigating the behaviour of Sqoop and we decided to implement a distributed profiler in order to get a detailed view what Sqoop is exactly doing.
A profiler tool for Hadoop.
Hprofiler (Hadoop Profiler) is a tool we have developed based on the ideas of stack profiling and Flame Graph visualization (as described in the original work of Brendan Gregg). The goal is to make the tool as versatile as possible so that it can investigate generic Hadoop application and answers the question "Where is my distributed application spending CPU time?".
A major problem in distributed computing is identifying bottlenecks of a user's application. Especially in a highly distributed environment this is often not easy. Furthermore, profiling computing frameworks like Hadoop and Spark only complicate the matter because these are JVM-based architectures. The reason for this additional complexity is because there is no system table available for the methods executed on the JVM. Also, the JVM uses the stack pointer register in the CPU as a general purpose register, thus, breaking traditional stack walking. In the following we will build on top of the work of Brendan Gregg at described in ‘Java Mixed-Mode Flame Graphs at Netflix, JavaOne 2015’. In particular we will use the Java8 option (available on update 60 build 19 and higher) -XX:+PreserveFramePointer.
In our architecture, we make use of perf-map-agent. This is an agent that will generate such a mapping for Java applications. When the agent is attached to a running Java application it instructs the JVM to report code blobs generated by the JVM at runtime for various purposes. This also includes JIT-compiled methods and other meta-data. However, this approach has its limitations, i.e., by default, the JIT compiler will ignore to inline the information of small methods. This could prevent the profiler from obtaining valuable information for identifying the correct code path. The solution we used for solving this, as obtained from the literature, is to specify another JVM option (-XX:InlineSmallCode=n). Using this option, we tell the JVM to inline the meta-information of the compiled method (think of the method name and class) and dispatch the information to an attached Java agent, which in turn, can write the information to a system table, as discussed above.
Since this blogpost is mostly dedicated to how we resolved the issue with Sqoop, we will not further discuss how we build the profiler.
Sqoop workload analysis.
As discussed in the introduction, we have several Sqoop jobs transferring data with a suboptimal throughput. In order to fully comprehend what is really happening on the cluster while transferring the data, we applied our newly developed profiler to this job. This resulted in the FlameGraph show below.
Note: Since we are only interested what Sqoop is doing and in order to improve readability we removed all other codepaths related to the JVM. An interactive and full overview of the FlameGraph is shown here.
This FlameGraph shows which code code paths were 'hot' during execution (on CPU). The workload is CPU bound on the Hadoop nodes as we are writing to the Parquet data format. This means, a wide frame will indicate methods which were prominent on CPU since the width of a frame is determined by the amount of occurances in the obtained stack samples. This is especially usefull when identifying possible bottlenecks. For example, a wide frame could indicate inefficiencies in said method. However, since frames which are located at the bottom of the stack are relatively wide, one needs to investigate the methods which are executed on top of these. In the case of Sqoop, the AvroUtil::toAvroIdentifier immediatly caught our eye, especially because a lot of time on the method is spent on regex pattern compilation (methods on top of toAvroIdentifier). When checking the implementation, it becomes clear why this method is so CPU heavy. First of all, all regex patterns have to be compiled, and all matches have to be replaced with "_". This is problematic as well since Java strings are immutable, so a lot of copying is done. Finally, instead of just accessing the first character by using .charAt, the developers fetch the first character as a string in order to enable further pattern matching. As a result, we figured if we could significantly improve this method, we could gain a serious increase in throughput since this method takes, on average, 50% of the runtime (as can be deduced from the FlameGraph) while converting a datasource to Parquet.
In the Figure above we show our optimized implementation (on the left), and the orignal implementation as done by the Sqoop project (on the right). Initial testing indicated that our method is about 500% more performant than the original (test code available here).
Increased throughput of data transfer from Oracle to HDFS (Parquet).
The proof of the pudding is in the eating. We applied the Sqoop patch to a test system and measured the throughput of data transfer from Oracle to Hadoop Parquet with the new optimized method. Since we are converting the data source to Parquet, some conversion and compression has to be done by the mapper. As a result, the Oracle database session is idle most of the time waiting for processing on the Hadoop side to finish. This is instrumented by Oracle in the SQL*Net message from client wait event. However, if our improved Sqoop is more performant, we should see an increase in database utilization and network throughput.
In the above Figure we analyse the network throughput and wait time. On the top we see the obtained metrics of Sqoop using our optimization, and at the bottom, the original. Using our method, we observe an increase in database utilization and an increased network throughput (+244% in this example), proving that optimizing the hot toAvroIdentifier method increases the throughput of Sqoop, and thus proving the effectiveness of hprofiler.
How can you use hprofiler to profile your (distributed) applications?
Currently, hprofiler requires root permissions, so if you are running this on your cluster be sure that you have access to the root account. Furthermore, since hprofiler relies on SSH connections, it could be of use to share the SSH keys between the machines. However, CERN users do not need to worry about this, since Keberos handles this for them. HProfiler is currently available on GitHub (for CERN users also available on AFS).
In order to profile a certain distributed application, one needs to devise a selector. A selector is a pipe of, e.g., grep's in order to filter out the required PID's from ps -elf. Let's say that we have a Hadoop job with the name application_123456789. Using the information, we can construct the selector "grep 123456789". This enables hprofiler to search on the cluster for the related PID's in order to start the profiling process. Note that we omit "application_", since Hadoop uses different prefixes for it's mappers. Furthermore, more complex selectors are possible as well (e.g., for non-Hadoop applications) by adding more grep commands to the pipe.
Once a user constructed a selector which is able to resolve the correct PID's, one needs to run the sampling frequency, sampling duration, and the hosts he or she wants to profile. One could also specify the YARN cluster address. Hprofiler would then automatically fetch the cluster nodes and initiate SSH connections to them in order to determine if PID's related to the user-provided selector are available and thus marking a host as "in use". Taking the example from above, running hprofiler could be as simple as this:
sh hprofiler.sh -f 300 -t 60 -c [cluster address] -j "grep 123456789" -o results
This command will sample the PID's obtained with the selector with a frequency of 300 Hz for 60 seconds. When the profiler is done sampling the program stacks, it will aggregate them, build a FlameGraph and put the results of all host in the results folder. Finally, it will do a final aggregation on a cluster level, in order to provide the user with a "cluster average" utilisation. However, in order to profile an application, one needs to add some JVM flags in order to enable stack walking on the JVM (-XX:+PreserveFramePointer -XX:InlineSmallCode=200), as discussed in the "Hadoop Profiler" section above.
Conclusion and acknowledgments
This post introduces HProfiler a profiling tool for troubleshooting distributed application and details an example of how hprofiler has been successfully used to troubleshooting Sqoop. HProfiler has allowed the author to identify and optimize a method in Sqoop responsible for a large amount of CPU utilization/wastage. A test version of Sqoop implementing the optimizations identified with HProfiler has shown improved throughput for data transfer from Oracle to HDFS as Parquetfiles (roughly a 2x increase). A patch has been submitted to the Sqoop developers, and we are awaiting review. Meanwhile, you can clone our Sqoop fork with the optimization we described above.
Many thanks to Zbigniew Baranowski, Daniel Lanza and Luca Canali of CERN IT-DB group for collaboration on this work. Hprofiler is based on perf, java stack profiling with flame graph visualization originally developed by Brendan Gregg, and the perf Java agent developed by Johannes Rudolph.
And as always, suggestions or feedback are more then welcome!