Using HyperLogLog in Cascalog Supporting tagline
Recently I have been working on using the HyperLogLog cardinality estimation algorithm with Cascalog. As I perused the web for examples of how other people were doing this I was able to find a couple of approaches to doing it.
This post is an off-shoot of an excellent article on this along with some good discussion about performance. Before you continue reading this I would encourage you to go over there and at least skim the article. They did a really nice job of explaining things, so I won't repeat most of it here.
In the article the authors did a very nice comparison of two different ways of using the HyperLogLog object from the library. Basically you can either use one object and "offer" values to it or create a new object with your value then merge it with another object that has values already in it. Not totally unexpectedly the performance is much better if you use one object and add values to it.
However, this doesn't take into account some of the optimization strategies that one needs to use when building MapReduce jobs. One of the things that I have learned while working with Cascalog and MapReduce over the last year or so is that it is very important to do as much work in the map tasks as possible. In other words, you really want to minimize the amount of data that needs to pass from map task to reduce task.
So what does this mean in the world of Cascalog? I have interpreted this to mean preferring
defparallelagg
to the other custom aggregators when doing
aggregation. The problem is that to do some of the aggregation map-side we don't get a chance to
hold on to any state like we do with the defaggregateop
. So our defparallelagg
is forced to
create a new HLL object for each one of the tuple values and then merge that object into another.
So our parallel aggregator will look like this:
(c/defparallelagg agg-hyperloglog :init-var #'hll-construct :combine-var #'hll-merge)
The way this works is that the function we've defined as :init-var
will get called once for each
tuple. In this case the hll-construct
function is creating a HyperLogLog object and offering the
tuple value to it then returning the HLL object. Then the function defined as the :combine-var
will get called with the results of the calls to the :init-var
functions as well as other combines
until there is only one value left. In this case, the hll-merge
function gets two HyperLogLog
objects (either returned from hll-construct
or a previous call to hll-merge
) and simply merges
them.
So what's going to happen in our query is that each ?uuid
value will be put into a HLL object and
then successively merged into other HLL objects with one or more ?uuid
values in them. This will
happen first on each map task then those single HLL objects will be sent from the map tasks to the
reduce tasks where they will all be combined (merged). And remember that this combining of HLL's is
happening based on the aggregation grouping in the query (in this case we're grouping by city, year
and day).
Here are hll-construct
and hll-merge
(defn hll-construct "Construct a HyperLogLog and offer the given init-value. 12 bits gives a little less than 2% standard error." [init-value] (doto (HyperLogLog. 12) (.offer init-value))) (defn hll-merge "Merge two or more HyperLogLog's together." [^HyperLogLog first-hlog & hlogs] (.merge ^HyperLogLog first-hlog (into-array HyperLogLog hlogs)))
You can find all of the code that I used in my Github repository. Most of the code is directly from the original blog post at Screen6. The code for the parallel implementation was adapted from a post on the Cascalog Google group by Jeroen van Dijk
Setup
I used write-ov-chipkaart-accesslog-file
to create files of random data each with 1 million
lines of simulated log entries. Each file is about 60MB. I then uploaded the files on S3 for the MR
run. I did two runs. The first with 10 of these files and the second with 30 of them.
(for [i (range 1 11)] (write-ov-chipkaart-accesslog-file (str "/tmp/chipkaart-" i) 1000000))
The Hadoop cluster I used was a 10 node Amazon EMR cluster using Large EC2 nodes with Amazon's flavor of Hadoop. The output of the Cascalog job was sunk to S3. Each Cascalog query resulted in a single MapReduce job using 60 map tasks and 17 reduce tasks.
Results
The results of the job run were measured using the stats from the JobTracker UI after the jobs had completed.
Run time | Map - tuples read | Map - tuples written | Reduce - tuples read | Reduce - tuples written | |
---|---|---|---|---|---|
Aggregate | 3:33 | 10,000,000 | 10,000,000 | 10,000,000 | 14 |
Parallel | 2:55 | 10,000,000 | 840 | 840 | 14 |
Aggregate | 6:19 | 30,000,000 | 30,000,000 | 30,000,000 | 14 |
Parallel | 5:21 | 30,000,000 | 840 | 840 | 14 |
As you can see the performance on this data is about 18% better for the defparallelagg
implementation. I
believe that the performance gain in using aggregating into a single HLL object is offset by the
time needed to shuffle/sort and send the data to the reduce tasks. As the data gets bigger and
bigger this difference will become even more pronounced.
So even though the merge implementation is 5x slower than the offer implementation, when run as a Cascalog job on a cluster with a decent amount of data, the merge implementation is about 15-20% faster than the offer.
I think the key thing to take away from this is that when measuring performance of Cascalog jobs (and really any MapReduce job) we have to be sure to test on a real cluster using a significant enough amount of data.
blog comments powered by Disqus