Posted tagged ‘Hadoop’

Apache Pig Tips #1

December 2, 2009

Pig is a new and growing platform on top of Hadoop that makes writing jobs easier because you can avoid writing Map and Reduce functions in Java directly while still allowing you to do so if you choose. Instead it creates a bunch of basic functions such as COUNT, FILTER, FOREACH, and such that you would normally have to independently write for each data manipulation you want to perform. Unfortunately, the Pig documentation is fairly sparse and performing what you would think is a basic manipulation can become very difficult if there are no examples.

In this post, I’m going to provide some examples based on what I have learned about Pig in the last week. I labeled this as Apache Pig Tips #1 because I expect I may write more in the future as I uncover additional usages.

My problem domain includes a data set that has multiple IDs and a result field:

{tcid, tpid, tsid, date, result}

There are a few more fields but I’ll leave those out for brevity. A quick description of what each of those IDs are: the tcid is a Test Case id that a result was inserted for. The tpid is the Test Plan that the Test Case was a part of for this result. The tsid is the Test Suite that the Test Plan belongs to. The date is the date the result was added, and the result is the actual result (Pass, Fail, Postponed… etc).

Now a Test Plan can have multiple Test Cases in it, however it can only have each test case in it once. A Test Case can also be in multiple Test Plans (though again only once for each Plan). A Test Suite can have multiple Test Plans, but each Test Plan belongs to exactly one Test Suite. Results for a test case in a test plan can be inserted multiple times. Maybe the first time it was tested it failed so a Fail entry is added. At a later date it passes so a new entry is  made with a Pass result. We need to generate a report that shows how many Pass and Fail per test suite using only the latest result (ignoring previous ones).

The tab separated data is located on HDFS in a file named ‘allresults’. First we need to load the data into a variable:

A = LOAD 'allresults' USING PigStorage() AS (tcid:int, tpid:int, tsid:int, date:chararray, result:chararray);

Next we need to find all Test Case/Test Plan combinations and group by them. This will give us a list of items that has multiple results of different dates, but all for the same test case in a test plan.

B = GROUP A BY (tcid, tpid) PARALLEL 100;

The Pig documentation mentions that the GROUP keyword can be applied to a single alias and the BY can apply to a single item. What isn’t easily discovered in the documentation is that the item can be a tuple, which you can define in line by surrounding multiple fields with (). Normally your group by looks like: B = GROUP A BY tcid; However, to group on multiple fields so that each entry is a unique combination of those fields you can surrounded it with () to make it a tuple.

Currently the definition of B looks something like this:

{group {tpid:int, tcid:int}, A {tcid:int, tpid:int, tsid:int, date:chararray, result:chararray}};

Basically we have a Bag where each item in the bag has a Bag containing the unique tpid and tcid, along with a second bag that contains 1 or more result rows. We need to look at that second bag and remove all but the most recent result rows so that we have just the most recent result.

X = FOREACH B {
    X1 = ORDER A BY date;
    X2 = LIMIT X1 1;
   GENERATE FLATTEN(X2);
}

This will loop through all items that were grouped by tcid and tpid. For each one it will order the inner result bag by date (descending by default). We then take only the first item from each of those ordered bags (the most recent result). We export to X the flattened version of the limited bag. This produces just a Bag of tuples that have all the non-recent results removed.

After that we can split up all of the results into separate aliases by filtering on X multiple times:

pass = FILTER X BY result == 'Pass';
fail = FILTER X BY result == 'Fail';
postpone = FILTER X BY result == 'Postpone';
-- group by suite and count it
passbysuite = GROUP pass BY tsid PARALLEL 100;
failbysuite = GROUP fail BY tsid PARALLEL 100;
postponebysuite = GROUP postpone BY tsid PARALLEL 100;
-- generate counts
passcount = FOREACH passbysuite GENERATE group, COUNT(pass);
failcount = FOREACH failbysuite GENERATE group, COUNT(fail);
postponecount = FOREACH postponebysuite GENERATE group, COUNT(postpone);

We now have 3 Bags, each containing a group which represents the tsid, along with a number correlating to how many results of each type existed.

Advertisements

Pig Frustrations

November 30, 2009

My desires to implement better scalability through pre-processing reports via the Grid have lead me to Pig. Unfortunately, while Pig does remove some of the difficulties of writing for Hadoop (you no longer have to write all of the map-reduce jobs yourself in java), it has many limitations.

The biggest limitation I’ve found so far is just lack of documentation. There are a few tutorials, and a language reference, but not really all that much more. Even internally at Yahoo there is only a few more bits of documentation than found at the official apache site. I know that Pig is very new and hasn’t even hit a 1.0 release yet, however the documentation really has to improve in order for people to start using it more.

Even when looking at the language reference itself I find there are limitations. For instance, you cannot use a FOREACH inside of a FOREACH. There’s no nested looping like that. This makes working with complex data structures quite difficult. The system seems to be based off of doing simple sorting, filtering, and grouping. However if you want to group multiple times to create more advanced data structures: bags within bags within bags, and then sort the inner most bag, there isn’t a construct for that that I have found.

Also I’ve run into error messages that do not really explain what went wrong. Since there is little documentation and even users so far, it’s not easy to resolve some of these errors. A quick search will usually result in a snippet of code where the error was defined or a list of error codes on a web page with no real descriptions to speak of. Definitely no possible solutions.

Despite those flaws, it still seems beneficial to write in Pig rather than writing manual MapReduce jobs.

It’s also possible that some of these flaws can be fixed by writing extensions in Java to call from my Pig scripts. I haven’t gathered up all of the requirements to build and compile custom functions into a jar yet, and perhaps that will be the next step. It’s just unfortunate that some of these things which I feel should be fairly basic are not included in the base package.

I’m still researching however and its possible I just haven’t discovered the way to do what I need. If that changes I’ll be sure to update.

Dynamic Offline Reports

November 23, 2009

Many applications have the primary concern of storing and retrieving data. The raw data by itself is often not very useful, so an additional process is put into place to turn that data into useful information. Many applications generate these reports from the data quickly at the user’s request through either a narrow SQL select statement, in application data processing, or both. However, in larger applications where the data is too large to handle in memory at a time, processing is too heavy, and report customization is too varied, information generation needs to be pushed to its own scalable system.

A lot of data has various pieces of metadata associated with it naturally. Data such as what user added the record, the date it was added or modified (or both), maybe the size of the record, categories, tags, keywords, or other pieces of associate data that is used to break it up into more manageable chunks. This meta data is useful, but only if we can use it as an aggregate to generate specific information relating to the grouping of items based on this data.

Sometimes generating these more specific reports is as easy as adding additional WHERE or GROUP BY clauses in SQL. However, when more advanced business rules are taking place where there isn’t an easy or succinct way of extracting this information via a query, or if the query returns such a large amount of data as to cause memory issues, a different approach can be taken.

For instance; in an application I am currently working on we need to generate reports based on a table with about 5 million rows. The SQL queries we use can limit the amount of rows returned to perhaps a few hundred thousand for some of our larger reports. However, a lot of the data needs to be processed in application code rather than by the database itself due to some special business rules. Because of this, we end up creating a large number of objects in Java to hold these result rows. If multiple users are generating different reports we might end up holding too many of these objects in memory at a time and receive an OOM error. Also, the processing on this data can be intense enough that if the server is slammed with report requests that the entire system slows down, causing difficulties for people wanting to insert or modify data. This is the case I am in while I contemplate offline report generation.

The basic idea is that the main application should be concerned purely with manipulating the data of the system. That is basic CRUD stuff such as creating new records, updating them, the rare deletions, and showing single records to the user (so they can edit or delete it). We want that part of the application to remain fast, and not be effected by the purely read-only needs imposed by report generation. In order to nullify the impact, we move reporting to its own system that reads from a separate read-only replication of our production database.

When a report request comes in to our application, we send a request to the separate reporting system. This can be done either as a web service or maybe an RPC call. The reporting system uses its read-only copy of the data to generate the report numbers and send it back, causing no speed delay for insertion or regular operation of the main application.

This doesn’t solve our OOM issues however, as many drivers for our database (MySQL) return ResultSet objects with the entire contents of the results which might be too large to fit into memory. However, since we’re using a read-only list anyway we can convert the table or tables we use to process our results into flat files that can be read in on a line by line basis, perform some intermediate result processing, deallocate those lines and work on additional lines. Since our reports are mostly generating statistical data over a large data set, we can process results on that data set in parallel using multiple threads or possibly multiple computers using a Hadoop cluster.

By making report generation asynchronous to our applications general work flow we will free up the processing power and the connection pool that’s used to handle requests by asking users to either poll for when the result is finished or to notify the system when a report is finished and thereby avoid the instances where we use all of our available connections or resources processing reports. There is also the added possibility of continuously generating all possible report data on a separate machine or cluster to decrease access time by increasing storage requirements.

I’m currently researching the use of MapReduce in Hadoop for processing this flat file of all data for reports. In addition, I’m researching a few languages that are reported to be good at concurrent processing so that I can benefit from multiple cores when generating reports from our raw data. My current focus is on Scala, Erlang, and Clojure, but I do not have much to report on those areas yet. If anyone has any information those languages as far as report generation based on a largish data set (currently 5 million, but the rate of growth is fairly alarming), let me know.