@Nondeterministic
public class Sessionize
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
This UDF takes a constructor argument which is the session timeout (an idle period of this amount indicates that a new session has started) and assumes the first element of the input bag is an ISO8601 timestamp. The input bag must be sorted by this timestamp. It returns the input bag with a new field, session_id, that is a GUID indicating the session of the request.
Example:
%declare TIME_WINDOW 30m
define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);
-- sessionize the visit stream
views = GROUP views BY member_id;
sessions = FOREACH views {
visits = ORDER views BY visit_date;
GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id);
}
-- count the number of sessions hitting the url
rollup = GROUP sessions BY url;
result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
| Constructor and Description |
|---|
Sessionize(java.lang.String timeSpec) |
| Modifier and Type | Method and Description |
|---|---|
void |
accumulate(org.apache.pig.data.Tuple input) |
void |
cleanup() |
org.apache.pig.data.DataBag |
getValue() |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warnpublic void accumulate(org.apache.pig.data.Tuple input)
throws java.io.IOException
accumulate in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>accumulate in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>java.io.IOExceptionpublic org.apache.pig.data.DataBag getValue()
getValue in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>getValue in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>public void cleanup()
cleanup in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>cleanup in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>