Apache DataFu™

Getting Started

DataFu Spark Docs

DataFu Pig Docs

DataFu Hourglass Docs

Community

Apache Software Foundation

Apache DataFu Pig - Guide

Bag operations

Often when working with data in Pig, it makes sense to keep the data grouped by one or more fields, which means you are working with bags. Unfortunately there aren't many convenient ways to work with bags in Pig out of the box. For this reason Apache DataFu provides several UDFs for performing useful operations on bags that come up in practice.

Counting Items in Bags

The CountEach UDF can be used to count the number of instances of items within a bag. It produces a new bag of the distinct items with their respective counts appended.

Let's take a look at an example where this might be useful. Suppose that we have a recommendation system, and we've tracked what items have been recommended.

items = FOREACH items GENERATE memberId, itemId;

Let's say that we want to compute the number of times an item has been shown to each user. Our output will have this schema:

{memberId:int, items: {{itemId:long, cnt:long}}}

Typically we would have to perform to GROUP operations to get this output. First we group by (memberId,itemId), count, and then group a second time. This requires two MapReduce jobs.

To make this case more efficient, we can use the CountEach UDF. It will produce the same output, but it only requires a single GROUP operation:

DEFINE CountEach datafu.pig.bags.CountEach();

items = FOREACH (GROUP items BY memberId) GENERATE
  group as memberId,
  CountEach(items.(itemId)) as items;

Bag Concatenation

BagConcat can be used to concatenate the tuples from two or more bags into a single bag:

define BagConcat datafu.pig.bags.BagConcat();

-- ({(1),(2),(3)},{(4),(5)},{(6),(7)})
input = LOAD 'input' AS (B1: bag{T: tuple(v:INT)}, B2: bag{T: tuple(v:INT)}, B3: bag{T: tuple(v:INT)});

-- ({(1),(2),(3),(4),(5),(6),(7)})
output = FOREACH input GENERATE BagConcat(B1,B2,B3);

BagConcat can also be used to concatenate all tuples present in a bag of bags.

define BagConcat datafu.pig.bags.BagConcat();
-- ({({(1),(2),(3)}),({(3),(4),(5)})})
input = LOAD 'input' AS (A: bag{T: tuple(bag{T2: tuple(v:INT)})});

-- ({(1),(2),(3),(3),(4),(5)})
output = FOREACH input GENERATE BagConcat(A);

Grouping Within a Bag

Pig has a GROUP operation that can be applied to a relation. It produces a new relation where the input tuples are grouped by a particular key. A bag in the relation contains the grouped tuples for that key. The key is represented by a group parameter.

BagGroup mimics the GROUP operation from Pig. The difference between them is that BagGroup operates within a bag, rather than on a relation. This can be useful when operating on bags that are not very large in size. We can operate on the tuples within this bag without involving GROUP, which would result in another MapReduce job. With BagGroup we can avoid this.

In the following example we take an input_bag consisting of key-value pairs (k,v) and group the tuples by k. This produces a new bag having tuples consisting of group and input_bag. The group corresponds to the grouping key k. The input_bag is a bag containing the tuples from the original input_bag that have the same k as group.

 define BagGroup datafu.pig.bags.BagGroup();

 data = LOAD 'input' AS (input_bag: bag {T: tuple(k: int, v: chararray)});
 -- ({(1,A),(1,B),(2,A),(2,B),(2,C),(3,A)})

 -- Group input_bag by k
 data2 = FOREACH data GENERATE BagGroup(input_bag, input_bag.(k)) as grouped;
 -- data2: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
 -- ({(1,{(1,A),(1,B)}),(2,{(2,A),(2,B),(2,C)}),(3,{(3,A)})})

We could also project out the key from the final input_bag using a nested FOREACH so that the bag only consists of the value v:

data3 = FOREACH data2 {
  -- project only the value
  projected = FOREACH grouped GENERATE group, input_bag.(v);
  GENERATE projected as grouped;
}

-- data3: {grouped: {(group: int,input_bag: {T: (k: int,v: chararray)})}}
-- ({(1,{(A),(B)}),(2,{(A),(B),(C)}),(3,{(A)})})

Append to Bag

AppendToBag can be used to append a tuple to a bag:

define AppendToBag datafu.pig.bags.AppendToBag();

-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));

-- ({(1),(2),(3),(4)})
output = FOREACH input GENERATE AppendToBag(B,T);

Prepend to Bag

PrependToBag can be used to prepend a tuple to a bag:

define PrependToBag datafu.pig.bags.PrependToBag();

-- ({(1),(2),(3)},(4))
input = LOAD 'input' AS (B: bag{T: tuple(v:INT)}, T: tuple(v:INT));

-- ({(4),(1),(2),(3)})
output = FOREACH input GENERATE PrependToBag(B,T);

Join Bags

Pig has a JOIN operator, but unfortunately it only operates on relations. Thus, if you wish to join tuples from two bags, you must first flatten, then join, then re-group. To make this process simpler DataFu provides a BagLeftOuterJoin UDF.

Let's walk through an example where this is useful. Suppose that we are building a recommendation system. This system recommends items to users, and these recommendations may be ignored, accepted, or rejected. When analyzing this system, we have a stream of impression, accept, and reject events:

impressions = LOAD '$impressions' AS (user_id:int, item_id:int, timestamp:long);
accepts = LOAD '$accepts' AS (user_id:int, item_id:int, timestamp:long);
rejects = LOAD '$rejects' AS (user_id:int, item_id:int, timestamp:long);

What we want to produce from this data is a bag of item counts per member:

features: {user_id:int, items:{(item_id:int, impression_count:int, accept_count:int, reject_count:int)}}

Using DataFu's CountEach we can efficiently produce the counts per item for impressions, accepts, and rejects as separate bags per member using a single MapReduce job:

define CountEach datafu.pig.bags.CountEach();

features_counted = FOREACH (COGROUP impressions BY user_id,
                                    accepts BY user_id,
                                    rejects BY user_id) GENERATE
  group as user_id,
  CountEach(impressions.item_id) as impressions,
  CountEach(accepts.item_id) as accepts,
  CountEach(rejects.item_id) as rejects;

This produces three bags, consisting of (item_id,count). We can now join these bags together using BagLeftOuterJoin:

define BagLeftOuterJoin datafu.pig.bags.BagLeftOuterJoin();

features_joined = FOREACH features_counted GENERATE
  user_id,
  BagLeftOuterJoin(
    impressions, 'item_id',
    accepts, 'item_id',
    rejects, 'item_id'
  ) as items;

We left join in the impression here since the user cannot accept or reject an item that was not seen. The left join can of course produce null values for accepts and rejects that did not occur, so let's clean those up by replacing null values with counts of zero:

define Coalesce datafu.pig.util.Coalesce();

features = FOREACH features_joined {
  projected = FOREACH items GENERATE
    impressions::item_id as item_id,
    impressions::count as impression_count,
    Coalesce(accepts::count, 0) as accept_count,
    Coalesce(rejects::count, 0) as reject_count;
  GENERATE user_id, projected as items;
}