Apache Pig Tips #1

Pig is a new and growing platform on top of Hadoop that makes writing jobs easier because you can avoid writing Map and Reduce functions in Java directly while still allowing you to do so if you choose. Instead it creates a bunch of basic functions such as COUNT, FILTER, FOREACH, and such that you would normally have to independently write for each data manipulation you want to perform. Unfortunately, the Pig documentation is fairly sparse and performing what you would think is a basic manipulation can become very difficult if there are no examples.

In this post, I’m going to provide some examples based on what I have learned about Pig in the last week. I labeled this as Apache Pig Tips #1 because I expect I may write more in the future as I uncover additional usages.

My problem domain includes a data set that has multiple IDs and a result field:

{tcid, tpid, tsid, date, result}

There are a few more fields but I’ll leave those out for brevity. A quick description of what each of those IDs are: the tcid is a Test Case id that a result was inserted for. The tpid is the Test Plan that the Test Case was a part of for this result. The tsid is the Test Suite that the Test Plan belongs to. The date is the date the result was added, and the result is the actual result (Pass, Fail, Postponed… etc).

Now a Test Plan can have multiple Test Cases in it, however it can only have each test case in it once. A Test Case can also be in multiple Test Plans (though again only once for each Plan). A Test Suite can have multiple Test Plans, but each Test Plan belongs to exactly one Test Suite. Results for a test case in a test plan can be inserted multiple times. Maybe the first time it was tested it failed so a Fail entry is added. At a later date it passes so a new entry is  made with a Pass result. We need to generate a report that shows how many Pass and Fail per test suite using only the latest result (ignoring previous ones).

The tab separated data is located on HDFS in a file named ‘allresults’. First we need to load the data into a variable:

A = LOAD 'allresults' USING PigStorage() AS (tcid:int, tpid:int, tsid:int, date:chararray, result:chararray);

Next we need to find all Test Case/Test Plan combinations and group by them. This will give us a list of items that has multiple results of different dates, but all for the same test case in a test plan.

B = GROUP A BY (tcid, tpid) PARALLEL 100;

The Pig documentation mentions that the GROUP keyword can be applied to a single alias and the BY can apply to a single item. What isn’t easily discovered in the documentation is that the item can be a tuple, which you can define in line by surrounding multiple fields with (). Normally your group by looks like: B = GROUP A BY tcid; However, to group on multiple fields so that each entry is a unique combination of those fields you can surrounded it with () to make it a tuple.

Currently the definition of B looks something like this:

{group {tpid:int, tcid:int}, A {tcid:int, tpid:int, tsid:int, date:chararray, result:chararray}};

Basically we have a Bag where each item in the bag has a Bag containing the unique tpid and tcid, along with a second bag that contains 1 or more result rows. We need to look at that second bag and remove all but the most recent result rows so that we have just the most recent result.

X = FOREACH B {
    X1 = ORDER A BY date;
    X2 = LIMIT X1 1;
   GENERATE FLATTEN(X2);
}

This will loop through all items that were grouped by tcid and tpid. For each one it will order the inner result bag by date (descending by default). We then take only the first item from each of those ordered bags (the most recent result). We export to X the flattened version of the limited bag. This produces just a Bag of tuples that have all the non-recent results removed.

After that we can split up all of the results into separate aliases by filtering on X multiple times:

pass = FILTER X BY result == 'Pass';
fail = FILTER X BY result == 'Fail';
postpone = FILTER X BY result == 'Postpone';
-- group by suite and count it
passbysuite = GROUP pass BY tsid PARALLEL 100;
failbysuite = GROUP fail BY tsid PARALLEL 100;
postponebysuite = GROUP postpone BY tsid PARALLEL 100;
-- generate counts
passcount = FOREACH passbysuite GENERATE group, COUNT(pass);
failcount = FOREACH failbysuite GENERATE group, COUNT(fail);
postponecount = FOREACH postponebysuite GENERATE group, COUNT(postpone);

We now have 3 Bags, each containing a group which represents the tsid, along with a number correlating to how many results of each type existed.

Explore posts in the same categories: Grid, Scalability

Tags: , ,

You can comment below, or link to this permanent URL from your own site.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s


%d bloggers like this: