Recently I’ve been working on a project to both learn and do a proof of concept using Hadoop for some data processing. The most I’ve done with Hadoop until this was a couple tutorials using word count examples. This project called for much more processing and the prospect of trying to figure out how to do that in straight MapReduce was daunting.
I was aware of Cascading and since all of our work is done in Java these days I decided to give that a try. So the first prototype was developed using Cascading and proved fairly successful. The code was kind of a mess though as Java code of any size tends to be. Since I have done a little bit of programming in Clojure over the last year or so I knew of Cascalog so decided to give it a try and see how it would compare with just doing it ourselves in Cascading.
Let me summarize the project first then I’ll share what I learned in the process.
The project is to take data in the form of pipe delimited text files that come in every night from about 2,000 individual stores, compute some aggregated stats and populate an Oracle database table with the results. Each line in the text files is an individual item sold at the store and the files contain all sales at the store for the last seven years or so. The Hadoop job needs to run over all the transactions in all of the files to calculate the totals. (I realize there are better ways to do this by only getting and processing updated data each day, but this is more of a learning project and evaluation of Hadoop, Cascading and Cascalog). All in all there are about 480 million lines in all of the files.
The incoming data looks like this:
store #|invoice #|customer #|invoice date|quantity|price|item type|line #|sub-line #|transaction id 16228|286688|6768|2012-06-02 13:48:06.634000|1.00|0.00|S|8|0|865052
It needs to calculate the number of unique customers, unique invoices, customer visits, total number of inventory items (item type “I”), total number of service items (item type “S”), total sales of inventory items, total sales of service items grouped by store, month and year of the sale.
The Cascading job that we created processes this data in about an hour and a half on my little 3 node MapR Hadoop cluster. So that was the standard I was using as I started rewriting it in Cascalog. My first results were very disappointing. The first attempt ran in about 3 hours. This first attempt used several custom operations including about 4 different aggregators.
In the Cascading job we had put all of the calculations into a buffer instead of several aggregators so that we only have to run over the incoming data once and calculate all the stats at once time. So my first idea to speed it up was to rewrite it to use a single buffer to do most of the calculations. Doing that gave me an 11% speed up. The time to run went from about 180 minutes to 150 minutes. Still not good enough.
A little more reading led me to this posting on MapR’s Q & A forum. I had noticed before that the job was only using one reducer and thought that was strange. Now I knew why it was doing that. I changed my job to use 5 reducers and that resulted in another 13% speed up (from 150 minutes to 130 minutes). Getting closer.
After posting a question looking for some advice on the Cascalog Google group Nathan suggested profiling the job using a tool such as YourKit. At about the same time I just happened to be reading the chapter on Java interop in the Clojure Programming book by Chas Emerick, Brian Carper and Christophe Grand. That chapter contains some great information on type hinting in Clojure to avoid Java reflection.
When I fired up YourKit and profiled the job I saw immediately that there was a lot of reflection going on. The most time consuming method call trace was:
clojure.lang.Var.applyTo(ISeq) ====> clojure.lang.Reflector.invokeInstanceMethod(Object, String, Object)
and many others just like that. So I proceeded to try to eliminate all Java reflection calls from the YourKit profile of the job. Here is one example. This snippet of code extracts the month number from a formatted date string using the parse-month function
(def date-formatter (DateTimeFormat/forPattern "yyyy-MM-dd HH:mm:ss.SSSSSS")) (defn parse-local-date-time [date] "Returns a Joda Time LocalDateTime object from the given string using the date-formatter format." (. date-formatter parseLocalDateTime date)) (defn month-from-local-date-time [date-time] "Returns the month number from the given Joda Time LocalDateTime object." (. date-time getMonthOfYear)) (defn parse-month [date-string] "Returns the month number from the given date string." (month-from-local-date-time (parse-local-date-time date-string)))
Can you spot the reflection going on here? No, I couldn’t either. So using (set! *warn-on-reflection*) in the repl and compiling the functions one by one I figured it out. Here is the hinted code:
(def ^DateTimeFormatter date-formatter (DateTimeFormat/forPattern "yyyy-MM-dd HH:mm:ss.SSSSSS")) (defn parse-local-date-time [^String date] "Returns a Joda Time LocalDateTime object from the given string using the date-formatter format." (. date-formatter parseLocalDateTime date)) (defn month-from-local-date-time [^LocalDateTime date-time] "Returns the month number from the given Joda Time LocalDateTime object." (. date-time getMonthOfYear)) (defn parse-month [date-string] "Returns the month number from the given date string." (month-from-local-date-time (parse-local-date-time date-string)))
There were quite a few functions that I added type hints to. This change resulted in a 38% speedup over the test with 5 reducers (130 minutes to 80 minutes). Now the job is running in about the same time as the Cascading job is. The next optimization I’m going to work on is speeding up all the data processing. The YourKit profile shows a lot of time being spent in the Joda time library working with the dates and I know I’m calling those methods way too often as you can probably see from the code above.
The Cascalog code is much easier to read and think about. Now that I have performance on par with raw Cascading I’m pretty happy and will move on with a couple other tasks in this project using Cascalog. I have learned that if you care about the performance of your Clojure code you HAVE to type hint a lot to avoid any Java reflection from places in your code that have high call rates.