datafu.pig.sampling
Class SimpleRandomSampleWithReplacementVote
java.lang.Object
org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
datafu.pig.sampling.SimpleRandomSampleWithReplacementVote
public class SimpleRandomSampleWithReplacementVote
- extends org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
Scalable simple random sampling with replacement (ScaSRSWR).
This UDF together with SimpleRandomSampleWithReplacementElect
implement a
scalable algorithm for simple random sampling with replacement (SRSWR), which is a
randomized algorithm with a failure rate less than 1.0E-4.
Let s be the desired sample size. To compute an SRSWR sample of size s, for each output
position in {0, 1, ..., s-1}, we want to select an item from the population uniformly
at random. This algorithm consists of two stages: vote and election. In the vote stage,
this UDF SimpleRandomSampleWithReplacementVote
votes items, called candidates,
for each position. In the election stage, the paired UDF
SimpleRandomSampleWithReplacementElect
elects one candidate for each position.
The algorithm succeeds if we have at least one candidate for each position.
To use this UDF pair, user needs to provide: 1) the desired sample size, 2) a good
lower bound of the population size or the exact size. The input to the vote UDF
SimpleRandomSampleWithReplacementVote
is a tuple that consists of a bag of
items, the desired sample size (int), and the population size (long) or a good lower
bound of it, where the latter two must be scalars. The output from the vote UDF is a
tuple that consists of position:int, score:double, and candidate. The input to the
elect UDF SimpleRandomSampleWithReplacementElect
is a tuple that contains all
candidates voted by the vote UDF for some positions. The output from the elect UDF is a
bag of sampled items.
For example, the following script generates a sample of size 100000 with replacement:
DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
item = LOAD 'input' AS (x:double);
summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count));
sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE FLATTEN(SRSWR_ELECT(candidates));
Because for election we only need to group candidates voted for the same position, this
algorithm can use many reducers to consume the candidates. See the "PARALLEL 10"
statement above. If the item to sample is the entire row, use TOBAG(TOTUPLE(*)).
SRSWR is heavily used in bootstrapping. Bootstrapping can be done easily with this UDF
pair. For example, the following script generates 100 bootstrap samples, computes the
mean value for each sample, and then outputs the bootstrap estimates.
summary = FOREACH (GROUP item ALL) GENERATE AVG(item.x) AS mean, COUNT(item) AS count;
candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count));
sampled = FOREACH (GROUP candidates BY (position % 100) PARALLEL 10) GENERATE AVG(SRSWR_ELECT(candidates)) AS mean;
bootstrap = FOREACH (GROUP sampled ALL) GENERATE summary.mean AS mean, sampled.mean AS bootstrapMeans;
Another usage of this UDF pair is to generate random pairs or tuples without computing
the cross product, where each pair or tuple consist of items from different input
sources. Let s be the number of random tuples we want to generate. For each input
source, simply use the vote UDF to propose candidates, then join the candidates from
different sources by their positions and for each position use the elect UDF to select
one candidate from each source to form the pair or tuple for that position.
The algorithm is a simple extension to the work
X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
Basically, for each output position, it performs a random sort on the population
(associates each item with a random score independently drawn from the uniform
distribution and then sorts items based on the scores), and picks the one that has the
smallest score. However, a probabilistic threshold is used to avoid sorting the entire
population. For example, if the population size is one billion and the random score
generated for an item is 0.9, very likely it won't become the smallest and hence we do
not need to propose it as a candidate.
More precisely, let n be the population size, n1 be a good lower bound of n, s be the
sample size, delta be the failure rate, and q be the threshold. For each output
position the probability of all random scores being greater than q is (1-q)^n. Thus, if
we throw away items with associated scores greater than q, with probability at least 1
- s*(1-q)^n, we can still capture the item with the smallest score for each position.
Fix delta = s*(1-q)^n and solve for q, we get q = 1-exp(log(delta/s)/n), Note that
replacing n by n1 < n can only decrease the failure rate, though at the cost of
increased number of candidates. The expected number of candidates is (1 -
exp(log(delta/s)/n1)*s*n. When n1 equals n, this number is approximately
s*log(s/delta).
Generating a random score for each (item, position) pair is very expensive and
unnecessary. For each item, the number of positions for which it gets voted follows a
binomial distribution B(s,q). We can simply draw a number from this distribution,
determine the positions by sampling without replacement, and then generate random
scores for those positions. This reduces the running time significantly.
Since for each position we only need the candidate with the smallest score, we
implement a combiner to reduce the size of intermediate data in the elect UDF
SimpleRandomSampleWithReplacementElect
.
- Author:
- ximeng
- See Also:
SimpleRandomSampleWithReplacementElect
,
Boostrapping (Wikipedia)
Fields inherited from class org.apache.pig.EvalFunc |
log, pigLogger, reporter, returnType |
Method Summary |
org.apache.pig.data.DataBag |
exec(org.apache.pig.data.Tuple tuple)
|
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
|
Methods inherited from class org.apache.pig.EvalFunc |
finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
OUTPUT_BAG_NAME_PREFIX
public static final java.lang.String OUTPUT_BAG_NAME_PREFIX
- See Also:
- Constant Field Values
CANDIDATE_FIELD_NAME
public static final java.lang.String CANDIDATE_FIELD_NAME
- See Also:
- Constant Field Values
POSITION_FIELD_NAME
public static final java.lang.String POSITION_FIELD_NAME
- See Also:
- Constant Field Values
SCORE_FIELD_NAME
public static final java.lang.String SCORE_FIELD_NAME
- See Also:
- Constant Field Values
FAILURE_RATE
public static final double FAILURE_RATE
- See Also:
- Constant Field Values
SimpleRandomSampleWithReplacementVote
public SimpleRandomSampleWithReplacementVote()
exec
public org.apache.pig.data.DataBag exec(org.apache.pig.data.Tuple tuple)
throws java.io.IOException
- Specified by:
exec
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
- Throws:
java.io.IOException
outputSchema
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
- Overrides:
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
Matthew Hayes, Sam Shah