限制reducer的输出

Limit the output from reducer

我有一个映射器 class 可以生成几十行。然后通过 mapreduce 内部框架对该输出进行排序和合并。排序后,我只想获得 reducer 输出的前 5 条记录。我怎样才能做到这一点? 我维护了一个计数变量,它在 reduce 方法中递增。但这不起作用,它在输出中给出所有记录。我认为这是因为 reduce class 被调用到 reducer 的每个输入行,所以 count 每次都被初始化为 0。有没有办法维护全局变量?

public class Reduce2 扩展 Reducer{

int count=0;
@Override
protected void reduce(IntWritable1 key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

    int count=0;
    String key1 = "";
    for(Text value:values) {
        key1+=value;
    }
    if(count<5) {
        count++;
        context.write(new Text(key1), key);

    }
}

}

Reducer 的

运行() 方法执行一次,它为每个键调用 reduce() 方法。以下是 Reducer 的 运行() 方法的默认代码。

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

因此,如果您在 reduce() 方法中定义了 count 变量,那么每次(对于每个键)都会对其进行初始化。而是在你的 reducer 实现中覆盖 Reducer 的这个 运行() 方法,并将计数变量移动到这个 运行() 方法。

  public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        int count=0;
        try {
          while (context.nextKey() && count<5) {
              count++;
            reduce(context.getCurrentKey(), context.getValues(), context);
            // If a back up store is used, reset it
            Iterator<Text> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator) {
              ((ReduceContext.ValueIterator<Text>)iter).resetBackupStore();        
            }
          }
        } finally {
          cleanup(context);
        }
}

这应该有效。