Apache DataFu™

Getting Started

DataFu Spark Docs

DataFu Pig Docs

DataFu Hourglass Docs

Community

Apache Software Foundation

Apache DataFu Pig - Guide

More Tips and Tricks

Coalesce

Using ternary operators is fairly common in Pig. For example, often you want to replace null values with zero:

data = FOREACH data GENERATE (val IS NOT NULL ? val : 0) as result;

Or, sometimes you want to return the first non-null value among several fields:

data = FOREACH data GENERATE (val1 IS NOT NULL ? val1 :
               (val2 IS NOT NULL ? val2 :
               (val3 IS NOT NULL ? val3 :
               NULL))) as result;

The above code is very hard to follow, and it is very cumersome to write. To solve this problem, Apache DataFu provides the useful Coalesce. This is very similar to the COALESCE available in some SQL implementations. It simply takes the first non-null value from the arguments passed in. With Coalesce we can clean up the code above.

To replace any null value with 0:

DEFINE Coalesce datafu.pig.util.Coalesce();

data = FOREACH data GENERATE Coalesce(val,0) as result;

To return the first non-null value:

data = FOREACH data GENERATE Coalesce(val1,val2,val3) as result;

Left Joining Multiple Relations

Suppose we have three data sets:

input1 = LOAD 'input1' using PigStorage(',') AS (key:INT,val:INT);
input2 = LOAD 'input2' using PigStorage(',') AS (key:INT,val:INT);
input3 = LOAD 'input3' using PigStorage(',') AS (key:INT,val:INT);

Let's say we want to left join input1 with input2 and input3. You can do this in SQL. Unfortunately Pig does not support outer joins on more than two relations.

-- DOES NOT WORK
joined = JOIN input1 BY key LEFT,
         input2 BY key, input3 BY key;

Instead, you have to join twice, which means two MapReduce jobs:

data1 = JOIN input1 BY key LEFT, input2 BY key;
data2 = JOIN data1 BY input1::key LEFT, input3 BY key;

This is unfortunate, as left joins are very common, and for some applications it is common to need to left join multiple relations. Take a recommendation system for example: you start with a set of candidates to score and you join in multiple sets of features. Each set of features requires another join.

You can, however, perform a left join effectively by using COGROUP with multiple relations and applying clever use of FLATTEN. Then only a single MapReduce job is required. But this gets pretty messy:

data1 = COGROUP input1 BY key, input2 BY key, input3 BY key;
data2 = FOREACH data1 GENERATE
 FLATTEN(input1), -- left join on this
 FLATTEN((IsEmpty(input2) ? TOBAG(TOTUPLE((int)null,(int),null)) : input2))
   AS (input2::key,input2::val),
FLATTEN((IsEmpty(input3) ? TOBAG(TOTUPLE((int)null,(int),null)) : input3))
   AS (input3::key,input3::val);

As messy as this looks, it does work. It relies on the fact that flattening an empty bag produces no output. Therefore any records not appearing in input will be removed. Since we don't want the lack of records in input2 or input3 to cause records to be removed, we replace the empty bag with a bag having just a single tuple with null values. Using these tricks we are able to simulate a left join on multiple relations.

To clean up this code, DataFu provides the EmptyBagToNullFields UDF. This performs the same logic above and makes the code much easier to write and understand:

DEFINE EmptyBagToNullFields datafu.pig.bags.EmptyBagToNullFields();

data = FOREACH (COGROUP input1 BY key, input2 BY key, input3 BY key) GENERATE
  FLATTEN(input1), -- left join on this
  FLATTEN(EmptyBagToNullFields(input2)),
  FLATTEN(EmptyBagToNullFields(input3));

While you're at it, why not create a macro:

DEFINE left_outer_join(relation1, key1, relation2, key2, relation3, key3) returns joined {
  cogrouped = COGROUP $relation1 BY $key1, $relation2 BY $key2, $relation3 BY $key3;
  $joined = FOREACH cogrouped GENERATE
    FLATTEN($relation1),
    FLATTEN(EmptyBagToNullFields($relation2)),
    FLATTEN(EmptyBagToNullFields($relation3));
}

Now you can simply apply your left join macro:

features = left_outer_join(input1, val1, input2, val2, input3, val3);