Average Buffer Cascading
public class AverageBuffer extends BaseOperation implements Buffer
{
public AverageBuffer()
{
super( 1, new Fields( "average" ) );
}
public AverageBuffer( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
// init the count and sum
long count = 0;
long sum = 0;
// get all the current argument values for this grouping
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
while( arguments.hasNext() )
{
count++;
sum += arguments.next().getInteger( 0 );
}
// create a Tuple to hold our result values
Tuple result = new Tuple( sum / count );
// return the result Tuple
bufferCall.getOutputCollector().add( result );
}
}