Hadoop 多输入错误分组 - 双向连接练习

Hadoop Multiple Inputs wrongly grouped - Two Way Join Exercise

我正在尝试研究一些 hadoop,并阅读了很多有关如何进行自然连接的内容。我有两个包含密钥和信息的文件,我想交叉并将最终结果显示为 (a, b, c)。

我的问题是映射器正在为每个文件调用缩减器。我期待收到类似 (10, [R1,S10, S22]) 的东西(作为键的 10,1、10、22 是不同行的值,它们以 10 作为键,R 和 S 正在标记,因此我可以识别他们来自table。

问题是我的 reducer 接收到 (10, [S10, S22]) 并且只有在完成所有 S 文件后我才得到另一个键值对,如 (10, [R1])。这意味着,它为每个文件分别按键分组并调用 reducer

我不确定这是否是正确的行为,我是否必须以不同的方式配置它,或者我是否做错了一切。

我也是 java 的新手,所以代码对您来说可能看起来很糟糕。

我正在避免使用 TextPair 数据类型,因为我自己还不能想出那个,我认为这将是另一种有效的方式(以防万一你想知道)。谢谢

运行 基于 WordCount 示例的 hadoop 2.4.1。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;

public class TwoWayJoin {

    public static class FirstMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            Text a = new Text();
            Text b = new Text();

            a.set(tokenizer.nextToken());
            b.set(tokenizer.nextToken());

            output.collect(b, relation);
        }
    }

    public static class SecondMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);

            Text b = new Text();
            Text c = new Text();

            b.set(tokenizer.nextToken());
            c.set(tokenizer.nextToken());

            Text relation = new Text("S"+c.toString());

            output.collect(b, relation);

        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

            ArrayList < Text > RelationS = new ArrayList < Text >() ;
            ArrayList < Text > RelationR = new ArrayList < Text >() ;

            while (values.hasNext()) {
                String relationValue = values.next().toString();
                if (relationValue.indexOf('R') >= 0){
                    RelationR.add(new Text(relationValue));
                } else {
                    RelationS.add(new Text(relationValue));
                }
            }

            for( Text r : RelationR ) {
                for (Text s : RelationS) {
                    output.collect(key, new Text(r + "," + key.toString() + "," + s));
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(MultipleInputs.class);
        conf.setJobName("TwoWayJoin");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, FirstMap.class);
        MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, SecondMap.class);

        Path output = new Path(args[2]); 

        FileOutputFormat.setOutputPath(conf, output);

        FileSystem.get(conf).delete(output, true);

        JobClient.runJob(conf);

    }
}

R.txt

(a  b(key))
2   46
1   10
0   24
31  50
11  2
5   31
12  36
9   46
10  34
6   31

S.txt

(b(key)  c)
45  32
45  45
46  10
36  15
45  21
45  28
45  9
45  49
45  18
46  21
45  45
2   11
46  15
45  33
45  6
45  20
31  28
45  32
45  26
46  35
45  36
50  49
45  13
46  3
46  8
31  45
46  18
46  21
45  26
24  15
46  31
46  47
10  24
46  12
46  36

此代码的输出成功但为空,因为我的数组 R 为空或数组 S 为空。

如果我只是一一收集而不进行任何处理,我已经映射了所有行。

预期输出为

key  "a,b,c"

问题出在组合器上。请记住,combiner 在 map 输出上应用 reduce 函数。所以它间接地做的是 reduce 函数分别应用于您的 R 和 S 关系,这就是您在不同的 reduce 调用中获得 R 和 S 关系的原因。 注释掉

conf.setCombinerClass(Reduce.class);

再试运行应该没有问题。附带一提,组合器功能只有在您感觉地图输出的聚合结果与排序和洗牌完成后应用于输入时相同时才有用。