Recently I have been working on using the HyperLogLog cardinality estimation algorithm with Cascalog. As I perused the web for examples of how other people were doing this I was able to find a couple of approaches to doing it.

This post is an off-shoot of an excellent article on this along with some good discussion about performance. Before you continue reading this I would encourage you to go over there and at least skim the article. They did a really nice job of explaining things, so I won't repeat most of it here.

In the article the authors did a very nice comparison of two different ways of using the HyperLogLog object from the library. Basically you can either use one object and "offer" values to it or create a new object with your value then merge it with another object that has values already in it. Not totally unexpectedly the performance is much better if you use one object and add values to it.

However, this doesn't take into account some of the optimization strategies that one needs to use when building MapReduce jobs. One of the things that I have learned while working with Cascalog and MapReduce over the last year or so is that it is very important to do as much work in the map tasks as possible. In other words, you really want to minimize the amount of data that needs to pass from map task to reduce task.

So what does this mean in the world of Cascalog? I have interpreted this to mean preferring defparallelagg to the other custom aggregators when doing aggregation. The problem is that to do some of the aggregation map-side we don't get a chance to hold on to any state like we do with the defaggregateop. So our defparallelagg is forced to create a new HLL object for each one of the tuple values and then merge that object into another.

So our parallel aggregator will look like this:

(c/defparallelagg agg-hyperloglog
  :init-var #'hll-construct
  :combine-var #'hll-merge)

The way this works is that the function we've defined as :init-var will get called once for each tuple. In this case the hll-construct function is creating a HyperLogLog object and offering the tuple value to it then returning the HLL object. Then the function defined as the :combine-var will get called with the results of the calls to the :init-var functions as well as other combines until there is only one value left. In this case, the hll-merge function gets two HyperLogLog objects (either returned from hll-construct or a previous call to hll-merge) and simply merges them.

So what's going to happen in our query is that each ?uuid value will be put into a HLL object and then successively merged into other HLL objects with one or more ?uuid values in them. This will happen first on each map task then those single HLL objects will be sent from the map tasks to the reduce tasks where they will all be combined (merged). And remember that this combining of HLL's is happening based on the aggregation grouping in the query (in this case we're grouping by city, year and day).

Here are hll-construct and hll-merge

(defn hll-construct
  "Construct a HyperLogLog and offer the given init-value. 12 bits gives a little less than 2% standard error."
  [init-value]
  (doto (HyperLogLog. 12) (.offer init-value)))

(defn hll-merge
  "Merge two or more HyperLogLog's together."
  [^HyperLogLog first-hlog & hlogs]
  (.merge ^HyperLogLog first-hlog (into-array HyperLogLog hlogs)))

You can find all of the code that I used in my Github repository. Most of the code is directly from the original blog post at Screen6. The code for the parallel implementation was adapted from a post on the Cascalog Google group by Jeroen van Dijk

Setup

I used write-ov-chipkaart-accesslog-file to create files of random data each with 1 million lines of simulated log entries. Each file is about 60MB. I then uploaded the files on S3 for the MR run. I did two runs. The first with 10 of these files and the second with 30 of them.

(for [i (range 1 11)] (write-ov-chipkaart-accesslog-file (str "/tmp/chipkaart-" i) 1000000))

The Hadoop cluster I used was a 10 node Amazon EMR cluster using Large EC2 nodes with Amazon's flavor of Hadoop. The output of the Cascalog job was sunk to S3. Each Cascalog query resulted in a single MapReduce job using 60 map tasks and 17 reduce tasks.

Results

The results of the job run were measured using the stats from the JobTracker UI after the jobs had completed.

  Run time Map - tuples read Map - tuples written Reduce - tuples read Reduce - tuples written
Aggregate 3:33 10,000,000 10,000,000 10,000,000 14
Parallel 2:55 10,000,000 840 840 14
           
Aggregate 6:19 30,000,000 30,000,000 30,000,000 14
Parallel 5:21 30,000,000 840 840 14

As you can see the performance on this data is about 18% better for the defparallelagg implementation. I believe that the performance gain in using aggregating into a single HLL object is offset by the time needed to shuffle/sort and send the data to the reduce tasks. As the data gets bigger and bigger this difference will become even more pronounced.

So even though the merge implementation is 5x slower than the offer implementation, when run as a Cascalog job on a cluster with a decent amount of data, the merge implementation is about 15-20% faster than the offer.

I think the key thing to take away from this is that when measuring performance of Cascalog jobs (and really any MapReduce job) we have to be sure to test on a real cluster using a significant enough amount of data.



blog comments powered by Disqus

Published

27 November 2013