datafu.hourglass.jobs
Class AbstractPartitionPreservingIncrementalJob

java.lang.Object
  extended by org.apache.hadoop.conf.Configured
      extended by datafu.hourglass.jobs.AbstractJob
          extended by datafu.hourglass.jobs.TimeBasedJob
              extended by datafu.hourglass.jobs.IncrementalJob
                  extended by datafu.hourglass.jobs.AbstractPartitionPreservingIncrementalJob
All Implemented Interfaces:
org.apache.hadoop.conf.Configurable
Direct Known Subclasses:
PartitionPreservingIncrementalJob

public abstract class AbstractPartitionPreservingIncrementalJob
extends IncrementalJob

An IncrementalJob that consumes partitioned input data and produces output data having the same partitions. Typically this is used in conjunction with AbstractPartitionCollapsingIncrementalJob when computing aggregates over sliding windows. A partition-preserving job can perform initial aggregation per-day, which can then be consumed by a partition-collapsing job to produce the final aggregates over the time window. Only Avro is supported for the input, intermediate, and output data.

Implementations of this class must provide key, intermediate value, and output value schemas. The key and intermediate value schemas define the output for the mapper and combiner. The key and output value schemas define the output for the reducer. These are defined by overriding IncrementalJob.getKeySchema(), IncrementalJob.getIntermediateValueSchema(), and IncrementalJob.getOutputValueSchema().

Implementations must also provide a mapper by overriding getMapper() and an accumulator for the reducer by overriding getReducerAccumulator(). An optional combiner may be provided by overriding getCombinerAccumulator(). For the combiner to be used the property use.combiner must also be set to true.

The distinguishing feature this type of job is that the input partitioning is preserved in the ouput. The data from each partition is processed independently of other partitions and then output separately. For example, input that is partitioned by day can be aggregated by day and then output by day. This is achieved by attaching a long value to each key, which represents the partition, so that the reducer receives data grouped by the key and partition together. Multiple outputs are then used so that the output will have the same partitions as the input.

The input path can be provided either through the property input.path or by calling AbstractJob.setInputPaths(List). If multiple input paths are provided then this implicitly means a join is to be performed. Multiple input paths can be provided via properties by prefixing each with input.path., such as input.path.first and input.path.second. Input data must be partitioned by day according to the naming convention yyyy/MM/dd. The output path can be provided either through the property output.path or by calling AbstractJob.setOutputPath(Path). Output data will be written using the same naming convention as the input, namely yyyy/MM/dd, where the date used to format the output path is the same the date for the input it was derived from. For example, if the desired time range to process is 2013/01/01 through 2013/01/14, then the output will be named 2013/01/01 through 2013/01/14. By default the job will fail if any input data in the desired time window is missing. This can be overriden by setting fail.on.missing to false.

The job will not process input for which a corresponding output already exists. For example, if the desired date range is 2013/01/01 through 2013/01/14 and the outputs 2013/01/01 through 2013/01/12 exist, then only 2013/01/13 and 2013/01/14 will be processed and only 2013/01/13 and 2013/01/14 will be produced.

The number of paths in the output to retain can be configured through the property retention.count, or by calling AbstractJob.setRetentionCount(Integer). When this property is set only the latest paths in the output will be kept; the remainder will be removed. By default there is no retention count set so all output paths are kept.

The inputs to process can be controlled by defining a desired date range. By default the job will process all input data available. To limit the number of days of input to process one can set the property num.days or call TimeBasedJob.setNumDays(Integer). This would define a processing window with the same number of days, where the end date of the window is the latest available input and the start date is num.days ago. Only inputs within this window would be processed. Because the end date is the same as the latest available input, as new input data becomes available the end of the window will advance forward to include it. The end date can be adjusted backwards relative to the latest input through the property days.ago, or by calling TimeBasedJob.setDaysAgo(Integer). This subtracts as many days from the latest available input date to determine the end date. The start date or end date can also be fixed by setting the properties start.date or end.date, or by calling TimeBasedJob.setStartDate(Date) or TimeBasedJob.setEndDate(Date).

The number of reducers to use is automatically determined based on the size of the data to process. The total size is computed and then divided by the value of the property num.reducers.bytes.per.reducer, which defaults to 256 MB. This is the number of reducers that will be used. The number of reducers can also be set to a fixed value through the property num.reducers.

This type of job is capable of performing its work over multiple iterations. The number of days to process at a time can be limited by setting the property max.days.to.process, or by calling IncrementalJob.setMaxToProcess(Integer). The default is 90 days. This can be useful when there are restrictions on how many tasks can be used by a single MapReduce job in the cluster. When this property is set, the job will process no more than this many days at a time, and it will perform one or more iterations if necessary to complete the work. The number of iterations can be limited by setting the property max.iterations, or by calling IncrementalJob.setMaxIterations(Integer). If the number of iterations is exceeded the job will fail. By default the maximum number of iterations is 20.

Hadoop configuration may be provided by setting a property with the prefix hadoop-conf.. For example, mapred.min.split.size can be configured by setting property hadoop-conf.mapred.min.split.size to the desired value.

Author:
"Matthew Hayes"

Nested Class Summary
static class AbstractPartitionPreservingIncrementalJob.Report
          Reports files created and processed for an iteration of the job.
 
Constructor Summary
AbstractPartitionPreservingIncrementalJob()
          Initializes the job.
AbstractPartitionPreservingIncrementalJob(java.lang.String name, java.util.Properties props)
          Initializes the job with a job name and properties.
 
Method Summary
protected  ObjectReducer getCombineProcessor()
           
 Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getCombinerAccumulator()
          Gets the accumulator used for the combiner.
abstract  Mapper<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getMapper()
          Gets the mapper.
protected  ObjectMapper getMapProcessor()
           
protected  java.lang.String getOutputSchemaName()
          Get the name for the reduce output schema.
protected  java.lang.String getOutputSchemaNamespace()
          Get the namespace for the reduce output schema.
protected  ObjectReducer getReduceProcessor()
           
abstract  Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getReducerAccumulator()
          Gets the accumulator used for the reducer.
 java.util.List<AbstractPartitionPreservingIncrementalJob.Report> getReports()
          Get reports that summarize each of the job iterations.
protected  void initialize()
          Initialization required before running job.
 void run()
          Run the job.
 
Methods inherited from class datafu.hourglass.jobs.IncrementalJob
getIntermediateValueSchema, getKeySchema, getMaxIterations, getMaxToProcess, getOutputValueSchema, getSchemas, isFailOnMissing, setFailOnMissing, setMaxIterations, setMaxToProcess, setProperties
 
Methods inherited from class datafu.hourglass.jobs.TimeBasedJob
getDaysAgo, getEndDate, getNumDays, getStartDate, setDaysAgo, setEndDate, setNumDays, setStartDate, validate
 
Methods inherited from class datafu.hourglass.jobs.AbstractJob
config, createRandomTempPath, ensurePath, getCountersParentPath, getFileSystem, getInputPaths, getName, getNumReducers, getOutputPath, getProperties, getRetentionCount, getTempPath, isUseCombiner, randomTempPath, setCountersParentPath, setInputPaths, setName, setNumReducers, setOutputPath, setRetentionCount, setTempPath, setUseCombiner
 
Methods inherited from class org.apache.hadoop.conf.Configured
getConf, setConf
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AbstractPartitionPreservingIncrementalJob

public AbstractPartitionPreservingIncrementalJob()
                                          throws java.io.IOException
Initializes the job.

Throws:
java.io.IOException

AbstractPartitionPreservingIncrementalJob

public AbstractPartitionPreservingIncrementalJob(java.lang.String name,
                                                 java.util.Properties props)
                                          throws java.io.IOException
Initializes the job with a job name and properties.

Parameters:
name - job name
props - configuration properties
Throws:
java.io.IOException
Method Detail

getMapper

public abstract Mapper<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getMapper()
Gets the mapper.

Returns:
mapper

getCombinerAccumulator

public Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getCombinerAccumulator()
Gets the accumulator used for the combiner.

Returns:
combiner accumulator

getReducerAccumulator

public abstract Accumulator<org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord> getReducerAccumulator()
Gets the accumulator used for the reducer.

Returns:
reducer accumulator

run

public void run()
         throws java.io.IOException,
                java.lang.InterruptedException,
                java.lang.ClassNotFoundException
Run the job.

Specified by:
run in class AbstractJob
Throws:
java.io.IOException
java.lang.InterruptedException
java.lang.ClassNotFoundException

getReports

public java.util.List<AbstractPartitionPreservingIncrementalJob.Report> getReports()
Get reports that summarize each of the job iterations.

Returns:
reports

initialize

protected void initialize()
Description copied from class: AbstractJob
Initialization required before running job.

Overrides:
initialize in class IncrementalJob

getOutputSchemaName

protected java.lang.String getOutputSchemaName()
Get the name for the reduce output schema. By default this is the name of the class with "Output" appended.

Returns:
output schema name

getOutputSchemaNamespace

protected java.lang.String getOutputSchemaNamespace()
Get the namespace for the reduce output schema. By default this is the package of the class.

Returns:
output schema namespace

getMapProcessor

protected ObjectMapper getMapProcessor()

getCombineProcessor

protected ObjectReducer getCombineProcessor()

getReduceProcessor

protected ObjectReducer getReduceProcessor()


Matthew Hayes