datafu.pig.sessions
Class Sessionize
java.lang.Object
org.apache.pig.EvalFunc<T>
org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
datafu.pig.sessions.Sessionize
- All Implemented Interfaces:
- org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
@Nondeterministic
public class Sessionize
- extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
Sessionizes an input stream, appending a session ID to each tuple.
This UDF takes a constructor argument which is the session timeout (an idle
period of this amount indicates that a new session has started) and assumes
the first element of the input bag is an ISO8601 timestamp. The input bag
must be sorted by this timestamp. It returns the input bag with a new field,
session_id, that is a GUID indicating the session of the request.
Example:
%declare TIME_WINDOW 30m
define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
views = LOAD 'views.tsv' AS (visit_date:chararray, member_id:int, url:chararray);
-- sessionize the visit stream
views = GROUP views BY member_id;
sessions = FOREACH views {
visits = ORDER views BY visit_date;
GENERATE FLATTEN(Sessionize(VISITS)) AS (visit_date,member_id,url,session_id);
}
-- count the number of sessions hitting the url
rollup = GROUP sessions BY url;
result = FOREACH rollup GENERATE group AS url, COUNT(SESSIONS) AS session_cnt;
Fields inherited from class org.apache.pig.EvalFunc |
log, pigLogger, reporter, returnType |
Constructor Summary |
Sessionize(java.lang.String timeSpec)
|
Method Summary |
void |
accumulate(org.apache.pig.data.Tuple input)
|
void |
cleanup()
|
org.apache.pig.data.DataBag |
getValue()
|
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
|
Methods inherited from class org.apache.pig.AccumulatorEvalFunc |
exec |
Methods inherited from class org.apache.pig.EvalFunc |
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Sessionize
public Sessionize(java.lang.String timeSpec)
accumulate
public void accumulate(org.apache.pig.data.Tuple input)
throws java.io.IOException
- Specified by:
accumulate
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
accumulate
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
- Throws:
java.io.IOException
getValue
public org.apache.pig.data.DataBag getValue()
- Specified by:
getValue
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
getValue
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
cleanup
public void cleanup()
- Specified by:
cleanup
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
- Specified by:
cleanup
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
outputSchema
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
- Overrides:
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
Matthew Hayes, Sam Shah