地图减少计数器的条件以控制地图输出
Condition on map reduce counters to control the map output
是否有机会在映射器级别控制映射器输出的用户定义 java 计数器上设置条件??
Long l = context.getCounter(Counters.COUNT).getValue();
if(5L >= l) {
context.getCounter(Counters.COUNT).increment(1);
context.write((LongWritable)key, value);
} else {
System.out.println("MAP ELSE");
return;
}
将超过 5 个记录输入到 reducer。
有机会控制这个吗???
你不能那样做,如果你的输入文件有 3 个分割,那么你将有 3 个映射器 运行。每个映射器都有其单独的计数值(取决于如何增加计数值的逻辑),只有在洗牌阶段之后所有映射器完成后,该计数值才会在 reduce 端已知。
如果你想限制你的地图输出。然后有一个 reducer job.setNumReduceTasks(1)
并限制 reducer 的输出。像这样。
public static class WLReducer2 extends
Reducer<IntWritable, Text, Text, IntWritable> {
int count=0;
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text x : values) {
if (count < 5)
context.write(key, x);
count++;
}
};
}
如果你想获取reduce端的计数器值。您可以将其添加到 reduce 设置方法中。
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();
}
是否有机会在映射器级别控制映射器输出的用户定义 java 计数器上设置条件??
Long l = context.getCounter(Counters.COUNT).getValue();
if(5L >= l) {
context.getCounter(Counters.COUNT).increment(1);
context.write((LongWritable)key, value);
} else {
System.out.println("MAP ELSE");
return;
}
将超过 5 个记录输入到 reducer。 有机会控制这个吗???
你不能那样做,如果你的输入文件有 3 个分割,那么你将有 3 个映射器 运行。每个映射器都有其单独的计数值(取决于如何增加计数值的逻辑),只有在洗牌阶段之后所有映射器完成后,该计数值才会在 reduce 端已知。
如果你想限制你的地图输出。然后有一个 reducer job.setNumReduceTasks(1)
并限制 reducer 的输出。像这样。
public static class WLReducer2 extends
Reducer<IntWritable, Text, Text, IntWritable> {
int count=0;
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for (Text x : values) {
if (count < 5)
context.write(key, x);
count++;
}
};
}
如果你想获取reduce端的计数器值。您可以将其添加到 reduce 设置方法中。
@Override
public void setup(Context context) throws IOException, InterruptedException{
Configuration conf = context.getConfiguration();
Cluster cluster = new Cluster(conf);
Job currentJob = cluster.getJob(context.getJobID());
mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();
}