Answers for "Add Tuples aggregator Cascading"

0

Add Tuples aggregator Cascading

public class AddTuplesAggregator
  extends BaseOperation<AddTuplesAggregator.Context>
  implements Aggregator<AddTuplesAggregator.Context>
  {
  public static class Context
    {
    long value = 0;
    }

  public AddTuplesAggregator()
    {
    // expects 1 argument, fail otherwise
    super( 1, new Fields( "sum" ) );
    }

  public AddTuplesAggregator( Fields fieldDeclaration )
    {
    // expects 1 argument, fail otherwise
    super( 1, fieldDeclaration );
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // set the context object, starting at zero
    aggregatorCall.setContext( new Context() );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    TupleEntry arguments = aggregatorCall.getArguments();
    Context context = aggregatorCall.getContext();

    // add the current argument value to the current sum
    context.value += arguments.getInteger( 0 );
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // set the sum
    result.add( context.value );

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }
Posted by: Guest on September-28-2021

Browse Popular Code Answers by Language