Jan 312016
 

I recently finished up some data aggregation work involving Apache’s Hive, and as a means of getting some MapReduce work off the ground quickly, it’s pretty good. Hive’s goal is to abstract away MapReduce behind basic SQL queries, and on that front it succeeds. The fact that I’m ultimately doing MapReduce jobs is hidden except for what would look like a minor quirk if I didn’t know that was what was going on under the covers. That said, there were a couple of things I noticed both during development and with running the jobs on Amazon’s EMR service that are worth noting.

General Hive observations

This isn’t so much a pointer on how to use Hive as it is just something to note – data in your Hive tables ends up listed in the order you grouped them by in your INSERT command, not the order the columns are listed in your CREATE TABLE statement. Obviously this doesn’t impact your scripts, but don’t be surprised when the columns you see when you query a Hive database locally aren’t in the same order as they were when you defined the table.

I found that making simple intermediate tables made building out my final output easier. In fact, I think I actually had more helper tables than I had tables with the stats I wanted to view. It also gave me intermediate steps I could look at to investigate where my query logic was off if I saw errors in my test data. Another benefit was that I could break my processing into a series of fairly straightforward queries, which kept the logic more readable. The biggest downside was that I started having so many tables that it could be hard to keep track of how all I had organized my data where, although consistent naming convention helped clear that up. With a little thinking and planning, I was also able to re-use some of these intermediate tables in multiple queries.

One error message I’d see periodically was that a certain field wasn’t in the GROUP BY expression. OK, Hive is really just a SQL wrapper around MapReduce, so this column was needed to help Hive figure out the mapping part. No big deal, just add the column to the GROUP BY clause and all’s well. But another thing I would see was an error that a field in a table was invalid, despite knowing for a fact the field is in the table in question. The actual issue, however, was much simpler – that field was missing from the  GROUP BY expression. Why Hive didn’t just throw this error instead of claiming the field was invalid I have no idea, but the good news is that it’s an easy fix.

Actually, errors relating to fields missing from the GROUP BY clause were the only time that I was reminded that Hive runs MapReduce under the covers. Everything else was just SQL. This made Hive a great stepping-stone into doing MapReduce jobs, especially since about 80% of what we were doing fit very nicely into SQL queries anyhow (more on the other 20% later).

By the way, Hive writes everything but column headers from the tables it populates to standard error. If you’re hoping to capture output from the Hive jobs, stderr is the output you’re going to want.

Observations about running Hive on Amazon’s EMR platform

The first thing you should know if you’re running Hive scripts on EMR is that the number of reducers is the number of files that get written out. Not a big deal if a person is never going to read those files, but I recommend setting it the number of reducers to 1 at least during your early development and testing using the command:

set hive.exec.reducers.max=1;

In some of my early testing with a handful of data files, I noticed that I usually ended up with several files with a size of 0 bytes, and 1 file with actual data, so when you’re working with a reasonable subset of data you’re probably not going to notice much difference. This is probably impacting the EMR performance I’ll talk about later, but leaving the number of reducers at 1 makes it easy to download the raw output and review it.

Hive lets you write custom, user-defined functions that extend its functionality (I have a couple for parsing bits of data in my jobs). However, adds a heads-up for those using Java 8 as their language of choice and hoping to run their Hive jobs on Amazon’s EMR, the EMR machines don’t have Java 8 by default. Luckily, this problem is easily solved with a custom bootstrap script that downloads and installs it when the VM starts up. You can set this up in the “Bootstrap Actions” at the bottom of the “General Cluster Settings” page when setting up your EMR cluster.

Another thing I noticed, the logs from your EMR cluster (remember, they’re in stderr) seem to be updated well after the fact. I’ve seen the cluster listed as running for 30 minutes in the EMR console, only to check later to see that the cluster finished after 28 minutes. I’ve also seen the logs remain unchanged for several minutes, only to get updated with the results of several queries all at once. All the information will be in there, eventually, but just don’t rely on them to be real-time.

Another thing I noticed is that EMR seems slow, at least when I run it against relatively small sets of test data. I would see runs take 10 minutes or less on my laptop that take 30-40 on EMR. However, test sets were just run on a primary node since I was more interested in verifying the queries worked on EMR and not just my laptop, and not on a full cluster. I also haven’t compared the performance of doing a full run of data on a cluster versus my laptop. I suspect running Hive on EMR has enough overhead that it’s counter-productive for small datasets, but that it becomes much more efficient when tasked with a large volume of data. The performance I was seeing may also relate to the fact that our data is in a large number of relatively small files, and that writing fewer files with more data in each of them may speed things up. Ultimately, I’m a believer in getting the software working, then shipping, then tuning for speed. Also, this processing is for our internal use only right now, so the fact that our offline processing is slow is a pretty acceptable cost.

The biggest challenge I ran into with doing Hive jobs on EMR is building all-time aggregations of the stats from all our data runs. Doing this on regular tables was easy enough, but the issue came when I was running these same queries on data that I partition in the regular runs. Trying to combine those into a collection of all-time aggregations would leave me with just the regular data run’s data in the all-time table. Obviously, this was no good. The worst part of all of this was that it only happened when I ran my jobs on EMR. Yep, it was a classic “but it works on my machine” bug. After much trial, a lot of errors, and some consultation with 1 of my co-workers, the solution wound up being to keep a permanent secondary table that has the results of a regular run appended to it, and then to rebuild the all-time stats with the sum of all the secondary table’s stats.

This means I’m basically duplicating the final output of my regular data processing runs, but it preserves partitioning and gives me accurate all-time stats, which is better than where I started. As for why it worked on my machine, I’m guessing it had something to do with the data on my laptop stays in a Hive database, whereas when we run it on EMR we write the results out to text files. I don’t have any proof, it’s just a working hypothesis. Again, this only impacts dynamically partitioned tables, but it’s also the kind of thing that you only see right before you’re ready to release, which is the worst time to see data integrity problems.

The solution, assuming our secondary table is named  stats_rollup_partitioned_by_field , our all-time stats table is named  stats_all_time_partitioned_by_field , and the regular run results are in a table named  stats_partitioned_by_field (all our data lives in text files, so when I say “table” I’m referring to a file in a directory that’s specified by an input parameter set when I run the job, so there’s never an overlap of stats_partitioned_by_field tables), looks like this:

 

INSERT INTO TABLE stats_rollup_partitioned_by_field PARTITION (field) 
    SELECT * FROM stats_partitioned_by_field; 

INSERT OVERWRITE TABLE stats_all_time_partitioned_by_field PARTITION (field) 
    SELECT SUM(aggregationField), field from stats_rollup_partitioned_by_field;

All in all, Hive is a good choice for basic MapReduce projects. User-defined functions allow you to extend your Hive jobs beyond basic SQL as needed. While dynamic partitioning of your data can cause some unexpected behavior, it’s very handy in filtering out large data sets into separate directories. This means the performance for filtering data in my reporting application is nothing more than streaming the results of a different file. So long as you’re not unioning or otherwise trying to directly combine 2 dynamically partitioned tables, you should be fine.

Odds are you’re not going to be running production Hive jobs on a laptop or desktop somewhere, and that instead you’re going to be using a cluster somewhere. All I can say is grab a subset of your data files, put them on a small cluster, and run them there to make sure there’s not any issues that don’t appear on your laptop.

Hive is a surprisingly flexible and easy-to-use, thanks to the fact that most developers already know SQL. The fact that it can read from, and write data out to csv files makes it a good choice for offline data processing for web applications. Not only can your application easily parse the data, but you can download a file and understand it yourself.

 Posted by at 10:05 PM