public class EmpiricalCountEntropy
extends org.apache.pig.AccumulatorEvalFunc<java.lang.Double>
implements org.apache.pig.Algebraic
This UDF's constructor takes 1 argument: the logarithm base, whose definition is the same as that defined in Entropy
Entropy
, which calculates entropy from sorted raw data bag in accumulative mode,
this UDF calculates entropy from the data's occurrence frequencies which does not need to be sorted, either in accumulative or algebraic mode.How to use:
To use this UDF, customer needs to pre-compute the occurrence frequency of each data instance, often in an outer GROUP BY , and then use this UDF to calculate entropy with those frequency numbers in another outer GROUP BY.
Compared with Entropy
, this UDF is more scalable when we need to handle a very large data set,
since it could distribute computation onto mappers and take advantage of combiners to reduce intermedidate output from mappers to reducers.
define Entropy datafu.pig.stats.entropy.EmpiricalCountEntropy();
input = LOAD 'input' AS (val: double);
-- calculate the occurrence of each instance
counts_g = GROUP input BY val;
counts = FOREACh counts_g GENERATE COUNT(input) AS cnt;
-- calculate entropy
input_counts_g = GROUP counts ALL;
entropy = FOREACH input_counts_g GENERATE Entropy(counts) AS entropy;
Use case to calculate mutual information using EmpiricalCountEntropy:
define Entropy datafu.pig.stats.entropy.EmpiricalCountEntropy();
input = LOAD 'input' AS (valX: double, valY: double);
------------
-- calculate mutual information I(X, Y) using entropy
-- I(X, Y) = H(X) + H(Y) - H(X, Y)
------------
input_x_y_g = GROUP input BY (valX, valY);
input_x_y_cnt = FOREACH input_x_y_g GENERATE flatten(group) as (valX, valY), COUNT(input) AS cnt;
input_x_g = GROUP input_x_y_cnt BY valX;
input_x_cnt = FOREACH input_x_g GENERATE flatten(group) as valX, SUM(input_x_y_cnt.cnt) AS cnt;
input_y_g = GROUP input_x_y_cnt BY valY;
input_y_cnt = FOREACH input_y_g GENERATE flatten(group) as valY, SUM(input_x_y_cnt.cnt) AS cnt;
input_x_y_entropy_g = GROUP input_x_y_cnt ALL;
input_x_y_entropy = FOREACH input_x_y_entropy_g {
input_x_y_entropy_cnt = input_x_y_cnt.cnt;
GENERATE Entropy(input_x_y_entropy_cnt) AS x_y_entropy;
}
input_x_entropy_g = GROUP input_x_cnt ALL;
input_x_entropy = FOREACH input_x_entropy_g {
input_x_entropy_cnt = input_x_cnt.cnt;
GENERATE Entropy(input_x_entropy_cnt) AS x_entropy;
}
input_y_entropy_g = GROUP input_y_cnt ALL;
input_y_entropy = FOREACH input_y_entropy_g {
input_y_entropy_cnt = input_y_cnt.cnt;
GENERATE Entropy(input_y_entropy_cnt) AS y_entropy;
}
input_mi_cross = CROSS input_x_y_entropy, input_x_entropy, input_y_entropy;
input_mi = FOREACH input_mi_cross GENERATE (input_x_entropy::x_entropy +
input_y_entropy::y_entropy -
input_x_y_entropy::x_y_entropy) AS mi;
Entropy
Modifier and Type | Class and Description |
---|---|
static class |
EmpiricalCountEntropy.Final |
static class |
EmpiricalCountEntropy.Initial |
static class |
EmpiricalCountEntropy.Intermediate |
Constructor and Description |
---|
EmpiricalCountEntropy() |
EmpiricalCountEntropy(java.lang.String base) |
Modifier and Type | Method and Description |
---|---|
void |
accumulate(org.apache.pig.data.Tuple input) |
void |
cleanup() |
protected static org.apache.pig.data.Tuple |
combine(org.apache.pig.data.DataBag values) |
java.lang.String |
getFinal() |
java.lang.String |
getInitial() |
java.lang.String |
getIntermed() |
java.lang.Double |
getValue() |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
public EmpiricalCountEntropy() throws org.apache.pig.backend.executionengine.ExecException
org.apache.pig.backend.executionengine.ExecException
public EmpiricalCountEntropy(java.lang.String base) throws org.apache.pig.backend.executionengine.ExecException
org.apache.pig.backend.executionengine.ExecException
public java.lang.String getFinal()
getFinal
in interface org.apache.pig.Algebraic
public java.lang.String getInitial()
getInitial
in interface org.apache.pig.Algebraic
public java.lang.String getIntermed()
getIntermed
in interface org.apache.pig.Algebraic
protected static org.apache.pig.data.Tuple combine(org.apache.pig.data.DataBag values) throws org.apache.pig.backend.executionengine.ExecException
org.apache.pig.backend.executionengine.ExecException
public void accumulate(org.apache.pig.data.Tuple input) throws java.io.IOException
accumulate
in interface org.apache.pig.Accumulator<java.lang.Double>
accumulate
in class org.apache.pig.AccumulatorEvalFunc<java.lang.Double>
java.io.IOException
public java.lang.Double getValue()
getValue
in interface org.apache.pig.Accumulator<java.lang.Double>
getValue
in class org.apache.pig.AccumulatorEvalFunc<java.lang.Double>
public void cleanup()
cleanup
in interface org.apache.pig.Accumulator<java.lang.Double>
cleanup
in class org.apache.pig.AccumulatorEvalFunc<java.lang.Double>
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema
in class org.apache.pig.EvalFunc<java.lang.Double>