在 Reducer 阶段访问 Mapper 的计数器(在完成工作之前)

Accessing Mapper's counter in Reducer phase (before finishing the job)

正如标题所言,我的目标是在完成特定工作之前,在 reduce 阶段使用 Mapper 的计数器。

我遇到了几个与这个问题高度相关的问题,但没有一个解决了我所有的问题。 (Accessing a mapper's counter from a reducer, , 等等)

    @Override
public void setup(Context context) throws IOException, InterruptedException{
    Configuration conf = context.getConfiguration();
    Cluster cluster = new Cluster(conf);
    Job currentJob = cluster.getJob(context.getJobID());
    mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
}

我的问题是集群不包含任何作业历史记录。

我调用 mapreduce 作业的方式:

private void firstFrequents(String outpath) throws IOException,
        InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Cluster cluster = new Cluster(conf);
        conf.setInt("minFreq", MIN_FREQUENCY);
        Job job = Job.getInstance(conf, "APR");
        // Counters counters = job.getCounters();
        job.setJobName("TotalTransactions");
        job.setJarByClass(AssociationRules.class);
        job.setMapperClass(FirstFrequentsMapper.class);
        job.setReducerClass(CandidateReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path(outpath));


        job.waitForCompletion(true);
    }

映射器:

    import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstFrequentsMapper extends
        Mapper<Object, Text, Text, IntWritable> {
    public enum Counters {
        TotalTransactions
    }

    private IntWritable one = new IntWritable(1);

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] line = value.toString().split("\\t+|,+");
        int iter = 0;
        for (String string : line) {
            context.write(new Text(line[iter]), one);
            iter++;
        }
        context.getCounter(Counters.TotalTransactions).increment(1);

    }
    }

减速器

    public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    private int minFrequency;
    private long totalTransactions;

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        minFrequency = conf.getInt("minFreq", 1);    
       Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue();  
        System.out.print(totalTransactions);
    }


    public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
        int counter = 0;
        for (IntWritable val : values) {
            counter+=val.get();
        }

        /* Item frequency calculated*/
        /* Write it to output if it is frequent */
        if (counter>= minFrequency) {
            context.write(key,new IntWritable(counter));
        }
    }


}

获取计数器值的正确 setup()reduce() 实现与 the post that you mention:

中所示的完全相同
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
long counterValue = counter.getValue();

其中 TEST 是计数器的名称,在枚举 TestCounters 中声明。

我不明白你声明一个 Cluster 变量的原因...

另外,在您评论中提到的代码中,您应该将getValue()方法的返回结果存储在一个变量中,如上面的counterValue变量。

也许,您会发现 this post 也很有用。

更新: 根据您的编辑,我相信您正在寻找的只是 MAP_INPUT_RECORDS 的数量,这是默认计数器,所以您不需要需要重新实现它。

要从驱动程序 class 获取计数器的值,您可以使用(取自 this post):

job.getCounters().findCounter(COUNTER_NAME).getValue();