计数器在减速器代码中不起作用

Counter is not working in reducer code

我正在做一个大的 hadoop 项目,有一个小的 KPI,我必须在减少输出中只写前 10 个值。 为了完成这个要求,我使用了一个计数器并在计数器等于 11 时中断循环,但 reducer 仍然将所有值写入 HDFS。

这是一个非常简单的 java 代码,但我卡住了:(

为了测试,我创建了一个独立的 class(java 应用程序)来执行此操作,并且它在那里工作;我想知道为什么它在 reducer 代码中不起作用。

如果我遗漏了什么,请有人帮助我并提出建议。

地图 - 减少代码

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\t");

            //    int empId = Integer.parseInt(tokens[0])    ;    
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }    

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;    
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}  

简单 JAVA 代码工作正常

package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;    
            }

            System.out.println(value);    


        }

    }
}

示例数据--Header只是更多信息,实际数据来自第二行

`ID NAME COUNT(需要显示前 10 个 desc)

1 玩具总动员 (1995) 2077

10 黄金眼 (1995) 888

100 市政厅 (1996) 128

1000 凝结 (1996) 20

1001 Associate, The (L'Associe)(1982) 0

1002 埃德的下一步 (1996) 8

1003 极端措施 (1996) 121

1004 微光男 (1996) 101

1005 D3:威猛鸭 (1996) 142

1006 室, (1996) 78

1007 苹果饺子帮 (1975) 232

1008 荒野之王戴维·克罗克特 (1955) 97

1009 逃往 山 (1975) 291

101 瓶火箭 (1996) 253

1010 爱情虫 (1969) 242

1011 赫比再次骑行 (1974) 135

1012 老耶勒 (1957) 301

1013 Parent 陷阱 (1961) 258

1014 盲目乐观 (1960) 136

1015 归途:不可思议的旅程 (1993) 234

1016 毛茸茸的狗 (1959) 156

1017 瑞士家族罗宾逊 (1960) 276

1018 那只该死的猫! (1965) 123

1019 20,000 海里 (1954) 575

102 错先生 (1996) 60

1020 酷跑 (1993) 392

1021 外野天使 (1994) 247

1022 灰姑娘 (1950) 577

1023 小熊维尼与大风天 (1968) 221

1024 三个骑士,(1945) 126

1025 石中剑 (1963) 293

1026 心心相印 (1949) 8

1027 罗宾汉之盗贼王子 (1991) 344

1028 欢乐满人间 (1964) 1011

1029 小飞象 (1941) 568

103 难忘 (1996) 33

1030 皮特的龙 (1977) 323

1031 床把手和扫帚 (1971) 319`

如果您在 reduce 方法中移动 int cnt=0;(作为此方法的第一条语句),您将获得每个键的前 10 个值(我想这就是您想要的)。

否则,就像现在一样,您的计数器将继续增加,您将仅跳过第 11 个值(无论键如何),继续第 12 个。

如果您只想打印 10 个值(不管键是什么),请将 cnt 初始化保留在原处,并将 if 条件更改为 if (cnt > 10)... 但是,这不是一个好的做法,因此您可能需要重新考虑您的算法。 (假设你不想要 10 个随机值,当你有超过 1 个 reducer 和一个散列分区器时,你怎么知道在分布式环境中首先处理哪个键?)