Custom Aggregator Cascading
public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
implements Aggregator<SomeAggregator.Context>
{
public static class Context
{
Object value;
}
public void start( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the group values for the current grouping
TupleEntry group = aggregatorCall.getGroup();
// create a new custom context object
Context context = new Context();
// optionally, populate the context object
// set the context object
aggregatorCall.setContext( context );
}
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the current argument values
TupleEntry arguments = aggregatorCall.getArguments();
// get the context for this grouping
Context context = aggregatorCall.getContext();
// update the context object
}
public void complete( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// insert some values into the result Tuple based on the context
// return the result Tuple
aggregatorCall.getOutputCollector().add( result );
}
}