public class PageRank
extends org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
This is not a distributed implementation. Each graph is stored in memory while running the algorithm, with edges optionally spilled to disk to conserve memory. This can be used to distribute the execution of PageRank on multiple reasonably sized graphs. It does not distribute execuion of PageRank for each individual graph. Each graph is identified by an integer valued topic ID.
If the graph is too large to fit in memory than an alternative method must be used, such as an iterative approach which runs many MapReduce jobs in a sequence to complete the PageRank iterations.
Each graph is represented through a bag of (source,edges) tuples. The 'source' is an integer ID representing the source node. The 'edges' are the outgoing edges from the source node, represented as a bag of (dest,weight) tuples. The 'dest' is an integer ID representing the destination node. The weight is a double representing how much the edge should be weighted. For a standard PageRank implementation just use weight of 1.0.
The output of the UDF is a bag of (source,rank) pairs, where 'rank' is the PageRank value for that source in the graph.
There are several configurable options for this UDF, among them:
Parameters are configured by passing them in as a sequence of pairs into the UDF constructor. For example, below the alpha value is set to 0.87 and dangling nodes are enabled. All arguments must be strings.
define PageRank datafu.pig.linkanalysis.PageRank('alpha','0.87','dangling_nodes','true');
Full example:
topic_edges = LOAD 'input_edges' as (topic:INT,source:INT,dest:INT,weight:DOUBLE);
topic_edges_grouped = GROUP topic_edges by (topic, source) ;
topic_edges_grouped = FOREACH topic_edges_grouped GENERATE
group.topic as topic,
group.source as source,
topic_edges.(dest,weight) as edges;
topic_edges_grouped_by_topic = GROUP topic_edges_grouped BY topic;
topic_ranks = FOREACH topic_edges_grouped_by_topic GENERATE
group as topic,
FLATTEN(PageRank(topic_edges_grouped.(source,edges))) as (source,rank);
topic_ranks = FOREACH topic_ranks GENERATE
topic, source, rank;
Constructor and Description |
---|
PageRank() |
PageRank(java.lang.String... parameters) |
Modifier and Type | Method and Description |
---|---|
void |
accumulate(org.apache.pig.data.Tuple t) |
void |
cleanup() |
org.apache.pig.data.DataBag |
getValue() |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
public PageRank()
public PageRank(java.lang.String... parameters)
public void accumulate(org.apache.pig.data.Tuple t) throws java.io.IOException
accumulate
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
accumulate
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
java.io.IOException
public org.apache.pig.data.DataBag getValue()
getValue
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
getValue
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
public void cleanup()
cleanup
in interface org.apache.pig.Accumulator<org.apache.pig.data.DataBag>
cleanup
in class org.apache.pig.AccumulatorEvalFunc<org.apache.pig.data.DataBag>
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>