datafu.hourglass.jobs
Class AbstractPartitionCollapsingIncrementalJob

java.lang.Object
  extended by org.apache.hadoop.conf.Configured
      extended by datafu.hourglass.jobs.AbstractJob
          extended by datafu.hourglass.jobs.TimeBasedJob
              extended by datafu.hourglass.jobs.IncrementalJob
                  extended by datafu.hourglass.jobs.AbstractPartitionCollapsingIncrementalJob
All Implemented Interfaces:
org.apache.hadoop.conf.Configurable
Direct Known Subclasses:
PartitionCollapsingIncrementalJob

public abstract class AbstractPartitionCollapsingIncrementalJob
extends IncrementalJob

An IncrementalJob that consumes partitioned input data and collapses the partitions to produce a single output. This job can be used to process data using a sliding window. It is capable of reusing the previous output, which means that it can process data more efficiently. Only Avro is supported for the input, intermediate, and output data.

Implementations of this class must provide key, intermediate value, and output value schemas. The key and intermediate value schemas define the output for the mapper and combiner. The key and output value schemas define the output for the reducer. These are defined by overriding IncrementalJob.getKeySchema(), IncrementalJob.getIntermediateValueSchema(), and IncrementalJob.getOutputValueSchema().

Implementations must also provide a mapper by overriding getMapper() and an accumulator for the reducer by overriding getReducerAccumulator(). An optional combiner may be provided by overriding getCombinerAccumulator(). For the combiner to be used the property use.combiner must also be set to true.

The input path can be provided either through the property input.path or by calling AbstractJob.setInputPaths(List). If multiple input paths are provided then this implicitly means a join is to be performed. Multiple input paths can be provided via properties by prefixing each with input.path., such as input.path.first and input.path.second. Input data must be partitioned by day according to the naming convention yyyy/MM/dd. The output path can be provided either through the property output.path or by calling AbstractJob.setOutputPath(Path). Output data will be written using the naming convention yyyyMMdd, where the date used to format the output path is the same as the end of the desired time range to process. For example, if the desired time range to process is 2013/01/01 through 2013/01/14, then the output will be named 20130114. By default the job will fail if any input data in the desired time window is missing. This can be overriden by setting fail.on.missing to false.

The job will not process input if the corresponding output has already been produced. For example, if the desired date range is 2013/01/01 through 2013/01/14 and the output 20130114 already exists, then it assumes the work has alreaday been completed.

By default only the latest output will be kept. All other outputs will be removed. This can be controlled by setting the property retention.count, or by calling AbstractJob.setRetentionCount(Integer).

Two types of sliding windows may be used: fixed-length and fixed-start. For a fixed-length sliding window, the size of the window is fixed; the start and end move according to the availability of input data. For a fixed-start window, the size of the window is flexible; the start is fixed and the end moves according to the availability of input data.

A fixed-length sliding window can be defined either by setting the property num.days or by calling TimeBasedJob.setNumDays(Integer). This sets how many days of input data will be consumed. By default the end of the window will be the same as the date of the latest available input data. The start is then determine by the number of days to consume. The end date can be moved back relative to the latest input data by setting the days.ago property or by calling TimeBasedJob.setDaysAgo(Integer). Since the end date is determined by the availability of input data, as new data arrives the window will advance forward.

A fixed-start sliding window can be defined by setting the property start.date or by calling TimeBasedJob.setStartDate(java.util.Date). The end date will be the same as the date of the latest available input data. The end date can be moved back relative to the latest input data by setting the days.ago property or by calling TimeBasedJob.setDaysAgo(Integer). Because the end date is determined by the availability of input data, as new data arrives the window will grow to include it.

Previous output can be reused by setting the reuse.previous.output property to true, or by calling setReusePreviousOutput(boolean). Reusing the previous output is often more efficient because only input data outside of the time window covered by the previous output needs to be consumed. For example, given a fixed-start sliding window job, if one new day of input data is available since the last time the job ran, then the job can reuse the previous output and only read the newest day of data, rather than reading all the input data again. Given a fixed-length sliding window in the same scenario, the new output can be produced by adding the newest input to the previous output and subtracting the oldest input from the old window.

For a fixed-start sliding window, if the schema for the intermediate and output values are the same then no additional changes are necessary, as the reducer's accumulator should be capable of adding the new input to the previous output. However if they are different then a record must be defined by overriding getRecordMerger() so that the previous output can be merged with the partial output produced by reducing the new input data. For the fixed-length sliding window one must override getOldRecordMerger() to reuse the previous output. This method essentially unmerges old, partial output data from the current output. For this case as well if the intermediate and output schemas are the same the getRecordMerger() method does not need to be overriden.

The number of reducers to use is automatically determined based on the size of the data to process. The total size is computed and then divided by the value of the property num.reducers.bytes.per.reducer, which defaults to 256 MB. This is the number of reducers that will be used. This calculation includes the input data as well as previous output that will be reused. It is also possible calculate the number of reducers separately for the input and previous output through the properties num.reducers.input.bytes.per.reducer and num.reducers.previous.bytes.per.reducer. The reducers will be computed separately for the two sets of data and then added together. The number of reducers can also be set to a fixed value through the property num.reducers.

This type of job is capable of performing its work over multiple iterations if previous output can be reused. The number of days to process at a time can be limited by setting the property max.days.to.process, or by calling IncrementalJob.setMaxToProcess(Integer). The default is 90 days. This can be useful when there are restrictions on how many tasks can be used by a single MapReduce job in the cluster. When this property is set, the job will process no more than this many days at a time, and it will perform one or more iterations if necessary to complete the work. The number of iterations can be limited by setting the property max.iterations, or by calling IncrementalJob.setMaxIterations(Integer). If the number of iterations is exceeded the job will fail. By default the maximum number of iterations is 20.

Hadoop configuration may be provided by setting a property with the prefix hadoop-conf.. For example, mapred.min.split.size can be configured by setting property hadoop-conf.mapred.min.split.size to the desired value.

Author:
"Matthew Hayes"

Nested Class Summary
static class AbstractPartitionCollapsingIncrementalJob.Report
          Reports files created and processed for an iteration of the job.
 
Field Summary
protected  boolean _reusePreviousOutput
           
 
Constructor Summary
AbstractPartitionCollapsingIncrementalJob()
          Initializes the job.
AbstractPartitionCollapsingIncrementalJob(java.lang.String name, java.util.Properties props)
          Initializes the job with a job name and properties.
 
Method Summary
 Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getCombinerAccumulator()
          Gets the accumulator used for the combiner.
abstract  Mapper<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getMapper()
          Gets the mapper.
 Merger<org.apache.avro.generic.GenericRecord> getOldRecordMerger()
          Gets the record merger that is capable of unmerging old partial output from the new output.
protected  java.lang.String getOutputSchemaName()
          Get the name for the reduce output schema.
protected  java.lang.String getOutputSchemaNamespace()
          Get the namespace for the reduce output schema.
 Merger<org.apache.avro.generic.GenericRecord> getRecordMerger()
          Gets the record merger that is capable of merging previous output with a new partial output.
abstract  Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getReducerAccumulator()
          Gets the accumulator used for the reducer.
 java.util.List<AbstractPartitionCollapsingIncrementalJob.Report> getReports()
          Get reports that summarize each of the job iterations.
 boolean getReusePreviousOutput()
          Get whether previous output should be reused.
protected  void initialize()
          Initialization required before running job.
 void run()
          Run the job.
 void setProperties(java.util.Properties props)
          Sets the configuration properties.
 void setReusePreviousOutput(boolean reuse)
          Set whether previous output should be reused.
 
Methods inherited from class datafu.hourglass.jobs.IncrementalJob
getIntermediateValueSchema, getKeySchema, getMaxIterations, getMaxToProcess, getOutputValueSchema, getSchemas, isFailOnMissing, setFailOnMissing, setMaxIterations, setMaxToProcess
 
Methods inherited from class datafu.hourglass.jobs.TimeBasedJob
getDaysAgo, getEndDate, getNumDays, getStartDate, setDaysAgo, setEndDate, setNumDays, setStartDate, validate
 
Methods inherited from class datafu.hourglass.jobs.AbstractJob
config, createRandomTempPath, ensurePath, getCountersParentPath, getFileSystem, getInputPaths, getName, getNumReducers, getOutputPath, getProperties, getRetentionCount, getTempPath, isUseCombiner, randomTempPath, setCountersParentPath, setInputPaths, setName, setNumReducers, setOutputPath, setRetentionCount, setTempPath, setUseCombiner
 
Methods inherited from class org.apache.hadoop.conf.Configured
getConf, setConf
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

_reusePreviousOutput

protected boolean _reusePreviousOutput
Constructor Detail

AbstractPartitionCollapsingIncrementalJob

public AbstractPartitionCollapsingIncrementalJob()
                                          throws java.io.IOException
Initializes the job.

Throws:
java.io.IOException

AbstractPartitionCollapsingIncrementalJob

public AbstractPartitionCollapsingIncrementalJob(java.lang.String name,
                                                 java.util.Properties props)
                                          throws java.io.IOException
Initializes the job with a job name and properties.

Parameters:
name - job name
props - configuration properties
Throws:
java.io.IOException
Method Detail

getMapper

public abstract Mapper<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getMapper()
Gets the mapper.

Returns:
mapper

getCombinerAccumulator

public Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getCombinerAccumulator()
Gets the accumulator used for the combiner.

Returns:
combiner accumulator

getReducerAccumulator

public abstract Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getReducerAccumulator()
Gets the accumulator used for the reducer.

Returns:
reducer accumulator

getRecordMerger

public Merger<org.apache.avro.generic.GenericRecord> getRecordMerger()
Gets the record merger that is capable of merging previous output with a new partial output. This is only needed when reusing previous output where the intermediate and output schemas are different. New partial output is produced by the reducer from new input that is after the previous output.

Returns:
merger

getOldRecordMerger

public Merger<org.apache.avro.generic.GenericRecord> getOldRecordMerger()
Gets the record merger that is capable of unmerging old partial output from the new output. This is only needed when reusing previous output for a fixed-length sliding window. The new output is the result of merging the previous output with the new partial output. The old partial output is produced by the reducer from old input data before the time range of the previous output.

Returns:
merger

getOutputSchemaName

protected java.lang.String getOutputSchemaName()
Get the name for the reduce output schema. By default this is the name of the class with "Output" appended.

Returns:
output schema name

getOutputSchemaNamespace

protected java.lang.String getOutputSchemaNamespace()
Get the namespace for the reduce output schema. By default this is the package of the class.

Returns:
output schema namespace

setProperties

public void setProperties(java.util.Properties props)
Description copied from class: AbstractJob
Sets the configuration properties.

Overrides:
setProperties in class IncrementalJob
Parameters:
props - Properties

getReusePreviousOutput

public boolean getReusePreviousOutput()
Get whether previous output should be reused.

Returns:
true if previous output should be reused

setReusePreviousOutput

public void setReusePreviousOutput(boolean reuse)
Set whether previous output should be reused.

Parameters:
reuse - true if previous output should be reused

initialize

protected void initialize()
Description copied from class: AbstractJob
Initialization required before running job.

Overrides:
initialize in class IncrementalJob

run

public void run()
         throws java.io.IOException,
                java.lang.InterruptedException,
                java.lang.ClassNotFoundException
Description copied from class: AbstractJob
Run the job.

Specified by:
run in class AbstractJob
Throws:
java.io.IOException
java.lang.InterruptedException
java.lang.ClassNotFoundException

getReports

public java.util.List<AbstractPartitionCollapsingIncrementalJob.Report> getReports()
Get reports that summarize each of the job iterations.

Returns:
reports


Matthew Hayes