限制reducer的输出
Limit the output from reducer
我有一个映射器 class 可以生成几十行。然后通过 mapreduce 内部框架对该输出进行排序和合并。排序后,我只想获得 reducer 输出的前 5 条记录。我怎样才能做到这一点?
我维护了一个计数变量,它在 reduce 方法中递增。但这不起作用,它在输出中给出所有记录。我认为这是因为 reduce class 被调用到 reducer 的每个输入行,所以 count 每次都被初始化为 0。有没有办法维护全局变量?
public class Reduce2 扩展 Reducer{
int count=0;
@Override
protected void reduce(IntWritable1 key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
int count=0;
String key1 = "";
for(Text value:values) {
key1+=value;
}
if(count<5) {
count++;
context.write(new Text(key1), key);
}
}
}
Reducer 的 运行() 方法执行一次,它为每个键调用 reduce() 方法。以下是 Reducer 的 运行() 方法的默认代码。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
因此,如果您在 reduce() 方法中定义了 count 变量,那么每次(对于每个键)都会对其进行初始化。而是在你的 reducer 实现中覆盖 Reducer 的这个 运行() 方法,并将计数变量移动到这个 运行() 方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
int count=0;
try {
while (context.nextKey() && count<5) {
count++;
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<Text> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<Text>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
这应该有效。
我有一个映射器 class 可以生成几十行。然后通过 mapreduce 内部框架对该输出进行排序和合并。排序后,我只想获得 reducer 输出的前 5 条记录。我怎样才能做到这一点? 我维护了一个计数变量,它在 reduce 方法中递增。但这不起作用,它在输出中给出所有记录。我认为这是因为 reduce class 被调用到 reducer 的每个输入行,所以 count 每次都被初始化为 0。有没有办法维护全局变量?
public class Reduce2 扩展 Reducer{
int count=0;
@Override
protected void reduce(IntWritable1 key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
int count=0;
String key1 = "";
for(Text value:values) {
key1+=value;
}
if(count<5) {
count++;
context.write(new Text(key1), key);
}
}
}
运行() 方法执行一次,它为每个键调用 reduce() 方法。以下是 Reducer 的 运行() 方法的默认代码。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
因此,如果您在 reduce() 方法中定义了 count 变量,那么每次(对于每个键)都会对其进行初始化。而是在你的 reducer 实现中覆盖 Reducer 的这个 运行() 方法,并将计数变量移动到这个 运行() 方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
int count=0;
try {
while (context.nextKey() && count<5) {
count++;
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<Text> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<Text>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
这应该有效。