@Nondeterministic
public class Sessionize
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
This UDF takes a constructor argument which is the session timeout (an idle period of this amount indicates that a new session has started) and assumes the first element of the input bag is an ISO8601 timestamp. The input bag must be sorted by this timestamp. It returns the input bag with a new field, session_id, that is a GUID indicating the session of the request.
Example:
%declare TIME_WINDOW 30m
define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);
-- sessionize the visit stream
views = GROUP views BY member_id;
sessions = FOREACH views {
visits = ORDER views BY visit_date;
GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id);
}
-- count the number of sessions hitting the url
rollup = GROUP sessions BY url;
result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
Constructor and Description |
---|
Sessionize(java.lang.String timeSpec) |
Modifier and Type | Method and Description |
---|---|
void |
accumulate(org.apache.pig.data.Tuple input) |
void |
cleanup() |
org.apache.pig.data.DataBag |
getValue() |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
public void accumulate(org.apache.pig.data.Tuple input) throws java.io.IOException
accumulate
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
accumulate
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
java.io.IOException
public org.apache.pig.data.DataBag getValue()
getValue
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
getValue
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
public void cleanup()
cleanup
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
cleanup
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>