One of the big topics in the BigData community is Map/Reduce. There are a lot of good blogs that explain what Map/Reduce does and how it works logically, so I won’t repeat it (look here, here and here for a few). Very few of them however explain the technical flow of things, which I at least need, to understand the performance implications. You can always throw more hardware at a map reduce job to improve the overall time. I don’t like that as a general solution and many Map/Reduce programs can be optimized quite easily, if you know what too look for. And optimizing a large map/reduce jobs can be instantly translated into ROI!
The Word Count Example
I went over some blogs and tutorials about performance of Map/Reduce. Here is one that I liked. While there are a lot of good tips out there, none, except the one mentioned, talk about the Map/Reduce program itself. Most dive right into the various hadoop options to improve distribution and utilization. While this is important, I think we should start the actual problem we try to solve, that means the Map/Reduce Job.
To make things simple I am using Amazons Elastic Map Reduce. In my setup I started a new Job Flow with multiple steps for every execution. The Job Flow consisted of one master node and two task nodes. All of them were using the Small Standard instance.
While AWS Elastic Map/Reduce has its drawbacks in terms of startup and file latency (Amazon S3 has a high volatility), it is a very easy and consistent way to execute Map/Reduce jobs without needing to setup your own hadoop cluster. And you only pay for what you need! I started out with the word count example that you see in every map reduce documentation, tutorial or Blog. The result of the job always produces files that look something like this:
That idea is to count the occurrence of every word in a large number of text files. I processed around 30 files totaling somewhere around 200MB in size. I ran the original python version and then made a very small change to it. Without touching the configuration of hadoop I cut the execution time in half:
The Original Code:
#!/usr/bin/python import sys import re def main(argv): line = sys.stdin.readline() pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") try: while line: for word in pattern.findall(line): print "LongValueSum:" + word.lower() + "\t" + "1" line = sys.stdin.readline() except "end of file": return None if __name__ == "__main__": main(sys.argv)
The Optimized Code:
#!/usr/bin/python import sys import re def main(argv): line = sys.stdin.readline() pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*") map = dict() try: while line: for word in pattern.findall(line): map[word.lower()] = map.get(word.lower(), 0) + 1 if ( len(map) > 10000 ): for item in map.iteritems(): print "LongValueSum:" + item + "\t" + str(item) map.clear() line = sys.stdin.readline() for item in map.iteritems(): print "LongValueSum:" + item + "\t" + str(item) except "end of file": return None if __name__ == "__main__": main(sys.argv)
Instead of “emitting” every word with value 1 to the OutputCollector, I did an internal reduce before emitting it. The result is that instead of emitted the word ‘a’ 1000 times with value 1, I emitted it 1 time with value 1000. The end result of the job is the same, but in half the time. To understand this we need to look at the execution flow of map reduce.
Execution Path and Distribution
Elastic Map Reduce first schedules a Map Task task per file (or parts of the file). It then feeds each line of the file into the map function. The map function will emit each key/value, in this case each word of the line, to the OutputCollector. Each emitted key/value will be written to an intermediate file for later reduce. The Shuffle Process will make sure that each key, in this case each word, will be sent to the same reduce task (meaning hadoop node) for aggregation. If we emit the same word multiple times it also needs to be written and sent multiple times, which results in more I/O (disk and network). The logical conclusion is that we should „pre-reduce“ this on a per task node basis and send the minimal amount of data. This is what the Combiner is for, which is really a Reducer that is run locally on the same node after the Mapping. So we should be fine, right? Not really.
Inside of Amazons Elastic Map Reduce
To get a better idea of where I spent the time, I deployed Dynatrace into Amazons Map Reduce environment. This can be done fully automated with a simple bootstrap action (I will publish the result as a Fastpack on our community at a later time).
The original python run lasted roughly 5 minutes each run (between 290 and 320 seconds), while the optimized ran around 3 minutes (160-170 seconds). I used Dynatrace to split those run times into their different components to get a feel for where we spend time. Some numbers have a rather high volatility which, as I found out, is due to Amazon S3 and to a smaller degree garbage collection. I executed it several times and the volatility did not have a big impact on the overall job execution time.
The next thing we see is that the combine time has dropped dramatically, we could say it nearly vanished! That makes sense after all we were making sure that we emitted less duplicates, thus less to combine. In fact it might make sense to stop combining at all as we will see later on. Another item that has dramatically improved is the sort. Again that makes a lot of sense, less data to sort. While the majority of the combine and sort happens in a separate thread, it still saves a lot of CPU and I/O time!
On the other hand neither shuffle nor reduce time itself have changed really. I identified the fluctuations the table does show, as being AWS S3 volatility issues via a hotspot analysis, so I ignored them. The fact that we see no significant improvements here makes sense. The resulting intermediate files of each map task do not look much different, whether we combine or use the optimized code.
So it really was the optimization of the map operation itself, that lead to overall improvement in job run time. While I might have achieved the same goal by doubling the number of map nodes, it would cost me more to do so.
What happens during mapping
To understand why that simple change has such a large impact we need to look at what happens to emitted keys in a Map Job.
What most Map/Reduce tutorials forget to mention is that the collect method called by the Mapper serializes the key/value directly to an in-memory buffer, as can be seen in the diagram above and the hotspot below.
Once that buffer has reached a certain saturation, the Spill Thread kicks in and writes the data to an intermediate file (this is controlled by several io.sort.spill. options). Map/Reduce normally deals with a large amount of potentially never repeating data, so it has to spill to file eventually.
It is not enough to simple dump the data to file, the content has to be sorted and combined first. The sort is a preparation for the shuffle process and relative efficient (it sorts based on binary bytes, because the actual order is not important). The combine however needs to de-serialize the key and values again prior to writing.
So emitting a key multiple times has
- a direct negative impact on the map time and CPU usage, due to more serialization
- an indirect negative impact on CPU due to more spilling and additional deserialization in the combine step
- a direct impact on the map task, due to more intermediate files, which makes the final merge more expensive
Slower mapping obviously impacts the overall Job time directly. The more data we emit, the more CPU and I/O is consumed by the Spill Thread. If the SpillThread is too slow (e.g. expensive combine, slow disk), the in memory buffer might get fully saturated, in which case the map task has to wait (this can be improved by adjusting the io.sort.spill.percent hadoop option).
Finally after the Map Task finishes the actual mapping, it writes, sorts and combines the remaining data to file. Finally it merges all intermediate files into a single output file (which it might combine again). More emitted key’s thus mean more intermediate files to merge as well.
While the final flush only “slows” us down for 1.5 seconds, this still amounts to roughly 8 percent of the Mapper task. So we see it really does make a lot of sense to optimize the output of the map operation, prior to the combine or reduce step. It will save CPU, Disk and Network I/O and this of course means less Nodes are needed for the same work!
The one million X factor
Until now I have tried to explain the I/O and CPU implications of emitting many keys, but there is also another factor that should be considered when writing Map/Reduce jobs. The map function is executed potentially millions of times. Every ms consumed here can potentially lead to minutes in job time. Indeed most of the gain of my “optimization” came from speeding up the mapping itself and not from more effective combine and disk writes. On average each map method call had a little less to do and that paid off.
What that struck me when looking at Map/Reduce first, was that most samples and tutorials use scripting languages like python, perl or something else. This is realized via the Hadoop Streaming framework. While I understand that this lowers the barrier to write Map/Reduce jobs, it should not be used for serious tasks! To illustrate this I ran a randomly selected java version of the Word Count sample. The result is another 50-60% improvement on top of the optimized python (it might be even better, in a larger task).
The table shows the various execution times for:
- Optimized Java: ~1.5 minutes job execution time
The same trick as in python, if anybody really wants to have the code, it let me know.
- Optimized Java with no Combiner: roughly 10 seconds faster than the optimized one
As pointed out the pre-reduce in the map method makes the combine nearly redundant. The improvement in overall job time is minimal however due to the smallness of the job.
- Original Java: ~2.5 minutes
We see that all the times (mapping, combining, sorting, spilling) are a lot higher, as we came to expect
- Optimized Python: ~3 minutes
- Non-optimized python: ~5 minutes
Java is faster than Python every time and the optimized version of Java is twice as fast as the optimized python version. Remember that this is a small example and that the hadoop parameters are the same for all runs . In addition CPU was never a limiting factor. If you execute the same small code millions of times, even small differences matter. The difference between a single line mapped in java and python is maybe not even measurable. With 200 MB of text it adds up to more than a minute! The same would be true for small changes in any java Map/Reduce job. The difference between the original and the optimized java version is still more than 60% improvement!
Map Reduce is a very powerful and elegant way to distribute processing of large amounts of data across many hosts. It is also a bit of a brute and it pays of to analyze and optimize the performance of the map and reduce tasks before we start playing with hadoop options. While Map/Reduce can reduce the job time by throwing more hardware the problem, easy optimizations often reach a similar effect. In the cloud and AWS Elastic Map Reduce that means less cost!