public class StreamingQuantile
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.Tuple>
The algorithm is described here: http://www.cs.ucsb.edu/~suri/cs290/MunroPat.pdf
The implementation is based on the one in Sawzall, available here: szlquantile.cc
N.B., all the data is pushed to a single reducer per key, so make sure some partitioning is done (e.g., group by 'day') if the data is too large. That is, this isn't distributed quantiles.
Note that unlike datafu's standard Quantile algorithm, the Munro-Paterson algorithm gives approximate quantiles and does not require the input bag to be sorted. Because it implements accumulate, StreamingQuantile can be much more efficient than Quantile for large amounts of data which do not fit in memory. Quantile must spill to disk when the input data is too large to fit in memory, which will contribute to longer runtimes.
The constructor takes a single integer argument that specifies the number of evenly-spaced quantiles to compute, e.g.,
Alternatively the constructor can take the explicit list of quantiles to compute, e.g.
The list of quantiles need not span the entire range from 0.0 to 1.0, nor do they need to be evenly spaced, e.g.
Be aware when specifying the list of quantiles in this way that more quantiles may be computed internally than are actually returned. The GCD of the quantiles is found and this determines the number of evenly spaced quantiles to compute. The requested quantiles are then returned from this set. For instance:
The error on the approximation goes down as the number of buckets computed goes up.
Example:
define Quantile datafu.pig.stats.StreamingQuantile('5');
-- input: 9,10,2,3,5,8,1,4,6,7
input = LOAD 'input' AS (val:int);
grouped = GROUP input ALL;
-- produces: (1.0,3.0,5.0,8.0,10.0)
quantiles = FOREACH grouped generate Quantile(input);
StreamingMedian
,
Quantile
Constructor and Description |
---|
StreamingQuantile(java.lang.String... k) |
Modifier and Type | Method and Description |
---|---|
void |
accumulate(org.apache.pig.data.Tuple b) |
void |
cleanup() |
org.apache.pig.data.Tuple |
getValue() |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
public void accumulate(org.apache.pig.data.Tuple b) throws java.io.IOException
accumulate
in interface org.apache.pig.Accumulator<org.apache.pig.data.Tuple>
accumulate
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.Tuple>
java.io.IOException
public void cleanup()
cleanup
in interface org.apache.pig.Accumulator<org.apache.pig.data.Tuple>
cleanup
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.Tuple>
public org.apache.pig.data.Tuple getValue()
getValue
in interface org.apache.pig.Accumulator<org.apache.pig.data.Tuple>
getValue
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.Tuple>
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.Tuple>