public class SimpleRandomSampleWithReplacementVote
extends org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
This UDF together with SimpleRandomSampleWithReplacementElect
implement a
scalable algorithm for simple random sampling with replacement (SRSWR), which is a
randomized algorithm with a failure rate less than 1.0E-4.
Let s be the desired sample size. To compute an SRSWR sample of size s, for each output
position in {0, 1, ..., s-1}, we want to select an item from the population uniformly
at random. This algorithm consists of two stages: vote and election. In the vote stage,
this UDF SimpleRandomSampleWithReplacementVote
votes items, called candidates,
for each position. In the election stage, the paired UDF
SimpleRandomSampleWithReplacementElect
elects one candidate for each position.
The algorithm succeeds if we have at least one candidate for each position.
To use this UDF pair, user needs to provide: 1) the desired sample size, 2) a good
lower bound of the population size or the exact size. The input to the vote UDF
SimpleRandomSampleWithReplacementVote
is a tuple that consists of a bag of
items, the desired sample size (int), and the population size (long) or a good lower
bound of it, where the latter two must be scalars. The output from the vote UDF is a
tuple that consists of position:int, score:double, and candidate. The input to the
elect UDF SimpleRandomSampleWithReplacementElect
is a tuple that contains all
candidates voted by the vote UDF for some positions. The output from the elect UDF is a
bag of sampled items.
For example, the following script generates a sample of size 100000 with replacement:
DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote(); DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect(); item = LOAD 'input' AS (x:double); summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count; candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), 100000, summary.count)); sampled = FOREACH (GROUP candidates BY position PARALLEL 10) GENERATE FLATTEN(SRSWR_ELECT(candidates));
Because for election we only need to group candidates voted for the same position, this algorithm can use many reducers to consume the candidates. See the "PARALLEL 10" statement above. If the item to sample is the entire row, use TOBAG(TOTUPLE(*)).
SRSWR is heavily used in bootstrapping. Bootstrapping can be done easily with this UDF pair. For example, the following script generates 100 bootstrap samples, computes the mean value for each sample, and then outputs the bootstrap estimates.
summary = FOREACH (GROUP item ALL) GENERATE AVG(item.x) AS mean, COUNT(item) AS count; candidates = FOREACH item GENERATE FLATTEN(SRSWR_VOTE(TOBAG(x), summary.count*100, summary.count)); sampled = FOREACH (GROUP candidates BY (position % 100) PARALLEL 10) GENERATE AVG(SRSWR_ELECT(candidates)) AS mean; bootstrap = FOREACH (GROUP sampled ALL) GENERATE summary.mean AS mean, sampled.mean AS bootstrapMeans;
Another usage of this UDF pair is to generate random pairs or tuples without computing the cross product, where each pair or tuple consist of items from different input sources. Let s be the number of random tuples we want to generate. For each input source, simply use the vote UDF to propose candidates, then join the candidates from different sources by their positions and for each position use the elect UDF to select one candidate from each source to form the pair or tuple for that position.
The algorithm is a simple extension to the work
X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
Basically, for each output position, it performs a random sort on the population (associates each item with a random score independently drawn from the uniform distribution and then sorts items based on the scores), and picks the one that has the smallest score. However, a probabilistic threshold is used to avoid sorting the entire population. For example, if the population size is one billion and the random score generated for an item is 0.9, very likely it won't become the smallest and hence we do not need to propose it as a candidate.
More precisely, let n be the population size, n1 be a good lower bound of n, s be the sample size, delta be the failure rate, and q be the threshold. For each output position the probability of all random scores being greater than q is (1-q)^n. Thus, if we throw away items with associated scores greater than q, with probability at least 1 - s*(1-q)^n, we can still capture the item with the smallest score for each position. Fix delta = s*(1-q)^n and solve for q, we get q = 1-exp(log(delta/s)/n), Note that replacing n by n1 < n can only decrease the failure rate, though at the cost of increased number of candidates. The expected number of candidates is (1 - exp(log(delta/s)/n1)*s*n. When n1 equals n, this number is approximately s*log(s/delta).
Generating a random score for each (item, position) pair is very expensive and unnecessary. For each item, the number of positions for which it gets voted follows a binomial distribution B(s,q). We can simply draw a number from this distribution, determine the positions by sampling without replacement, and then generate random scores for those positions. This reduces the running time significantly.
Since for each position we only need the candidate with the smallest score, we implement a combiner to reduce the size of intermediate data in the elect UDF
SimpleRandomSampleWithReplacementElect
.Modifier and Type | Field and Description |
---|---|
static java.lang.String |
CANDIDATE_FIELD_NAME |
static double |
FAILURE_RATE |
static java.lang.String |
OUTPUT_BAG_NAME_PREFIX |
static java.lang.String |
POSITION_FIELD_NAME |
static java.lang.String |
SCORE_FIELD_NAME |
Constructor and Description |
---|
SimpleRandomSampleWithReplacementVote() |
Modifier and Type | Method and Description |
---|---|
org.apache.pig.data.DataBag |
exec(org.apache.pig.data.Tuple tuple) |
org.apache.pig.impl.logicalLayer.schema.Schema |
outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input) |
allowCompileTimeCalculation, finish, getArgToFuncMapping, getCacheFiles, getInputSchema, getLogger, getPigLogger, getReporter, getReturnType, getSchemaName, getSchemaType, getShipFiles, isAsynchronous, progress, setInputSchema, setPigLogger, setReporter, setUDFContextSignature, warn
public static final java.lang.String OUTPUT_BAG_NAME_PREFIX
public static final java.lang.String CANDIDATE_FIELD_NAME
public static final java.lang.String POSITION_FIELD_NAME
public static final java.lang.String SCORE_FIELD_NAME
public static final double FAILURE_RATE
public SimpleRandomSampleWithReplacementVote()
public org.apache.pig.data.DataBag exec(org.apache.pig.data.Tuple tuple) throws java.io.IOException
exec
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>
java.io.IOException
public org.apache.pig.impl.logicalLayer.schema.Schema outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema input)
outputSchema
in class org.apache.pig.EvalFunc<org.apache.pig.data.DataBag>