Often when working with data in Pig, it makes sense to keep the data grouped by one or more fields, which means you are working with bags. Unfortunately there aren't many convenient ways to work with bags in Pig out of the box. For this reason Apache DataFu provides several UDFs for performing useful operations on bags that come up in practice.
The CountEach UDF can be used to count the number of instances of items within a bag. It produces a new bag of the distinct items with their respective counts appended.
Let's take a look at an example where this might be useful. Suppose that we have a recommendation system, and we've tracked what items have been recommended.
items = FOREACH items GENERATE memberId, itemId;
Let's say that we want to compute the number of times an item has been shown to each user. Our output will have this schema:
{memberId:int, items: {{itemId:long, cnt:long}}}
Typically we would have to perform to GROUP
operations to get this output. First we group
by (memberId,itemId)
, count, and then group a second time. This requires two MapReduce jobs.
To make this case more efficient, we can use the CountEach
UDF.
It will produce the same output, but it only requires a single GROUP
operation:
DEFINE CountEach datafu.pig.bags.CountEach();
items = FOREACH (GROUP items BY memberId) GENERATE
group as memberId,
CountEach(items.(itemId)) as items;
BagConcat can be used to concatenate the tuples from two or more bags into a single bag:
define BagConcat datafu.pig.bags.BagConcat();
-- ({(1),(2),(3)},{(4),(5)},{(6),(7)})
input = LOAD 'input' AS (B1: bag{T: tuple(v:INT)}, B2: bag{T: tuple(v:INT)}, B3: bag{T: tuple(v:INT)});
-- ({(1),(2),(3),(4),(5),(6),(7)})
output = FOREACH input GENERATE BagConcat(B1,B2,B3);
BagConcat can also be used to concatenate all tuples present in a bag of bags.
define BagConcat datafu.pig.bags.BagConcat();
-- ({({(1),(2),(3)}),({(3),(4),(5)})})
input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});
-- ({(1),(2),(3),(3),(4),(5)})
output = FOREACH input GENERATE BagConcat(A);
Pig has a GROUP
operation that can be applied to a relation. It produces a new relation where the input
tuples are grouped by a particular key. A bag in the relation contains the grouped tuples for that key. The key
is represented by a group
parameter.
BagGroup mimics the GROUP
operation from Pig. The difference between them is that BagGroup
operates within a bag, rather than on a relation.
This can be useful when operating on bags that are not very large in size. We can operate on the tuples within
this bag without involving GROUP
, which would result in another MapReduce job. With BagGroup
we can avoid this.
In the following example we take an input_bag
consisting of key-value pairs (k,v)
and group the tuples by k
.
This produces a new bag having tuples consisting of group
and input_bag
. The group
corresponds to the grouping
key k
. The input_bag
is a bag containing the tuples from the original input_bag
that have the same k
as group
.
define BagGroup datafu.pig.bags.BagGroup();
data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)});
-- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)})
-- Group input_bag by k
data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped;
-- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
-- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})})
We could also project out the key from the final input_bag
using a nested FOREACH
so that the bag only
consists of the value v
:
data3 = FOREACH data2 {
-- project only the value
projected = FOREACH grouped GENERATE group, input_bag.(v);
GENERATE projected as grouped;
}
-- data3: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
-- ({(1,{(A),(B)}),(2,{(A),(B),(C)}),(3,{(A)})})
AppendToBag can be used to append a tuple to a bag:
define AppendToBag datafu.pig.bags.AppendToBag();
-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-- ({(1),(2),(3),(4)})
output = FOREACH input GENERATE AppendToBag(B,T);
PrependToBag can be used to prepend a tuple to a bag:
define PrependToBag datafu.pig.bags.PrependToBag();
-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));
-- ({(4),(1),(2),(3)})
output = FOREACH input GENERATE PrependToBag(B,T);
Pig has a JOIN
operator, but unfortunately it only operates on relations. Thus, if you wish to join
tuples from two bags, you must first flatten, then join, then re-group. To make this process simpler DataFu
provides a BagLeftOuterJoin
UDF.
Let's walk through an example where this is useful. Suppose that we are building a recommendation system. This system recommends items to users, and these recommendations may be ignored, accepted, or rejected. When analyzing this system, we have a stream of impression, accept, and reject events:
impressions = LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long);
accepts = LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long);
rejects = LOAD '$rejects' AS (user_id:int, item_id:int, timestamp:long);
What we want to produce from this data is a bag of item counts per member:
features: {user_id:int, items:{(item_id:int, impression_count:int, accept_count:int, reject_count:int)}}
Using DataFu's CountEach we can efficiently produce the counts per item for impressions, accepts, and rejects as separate bags per member using a single MapReduce job:
define CountEach datafu.pig.bags.CountEach();
features_counted = FOREACH (COGROUP impressions BY user_id,
accepts BY user_id,
rejects BY user_id) GENERATE
group as user_id,
CountEach(impressions.item_id) as impressions,
CountEach(accepts.item_id) as accepts,
CountEach(rejects.item_id) as rejects;
This produces three bags, consisting of (item_id,count)
. We can now join these bags
together using BagLeftOuterJoin
:
define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();
features_joined = FOREACH features_counted GENERATE
user_id,
BagLeftOuterJoin(
impressions, 'item_id',
accepts, 'item_id',
rejects, 'item_id'
) as items;
We left join in the impression here since the user cannot accept or reject an item that was not seen. The left join can of course produce null values for accepts and rejects that did not occur, so let's clean those up by replacing null values with counts of zero:
define Coalesce datafu.pig.util.Coalesce();
features = FOREACH features_joined {
projected = FOREACH items GENERATE
impressions::item_id as item_id,
impressions::count as impression_count,
Coalesce(accepts::count, 0) as accept_count,
Coalesce(rejects::count, 0) as reject_count;
GENERATE user_id, projected as items;
}