卡在 Reduce Join 代码中

Stuck in the Reduce Join Code

我有两个数据集。两者都在下面给出
第一个数据集

1   A
2   B
3   C
4   D
5   E 


第二个数据集

1   ALPHA
2   BRAVO
3   CHARLIE
4   DELTA
5   ECHO


我想使用reduce side join
加入这个数据集 最终数据应该是这样的


A   ALPHA
B   BRAVO
C   CHARLIE
D   DELTA
E   ECHO


我写了下面的代码
Mapper(从第一个数据集中提取数据)

public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
    private String tokens[];
    public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
        tokens=value.toString().split("\t");
        context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
    }
}

Mapper(从第二个数据集中提取数据)

public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
        private String tokens[];
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
        }
}

Reducer(根据需要加入数据)

public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
    private String output1=new String();
    private String output2=new String();
    private TreeMap<String,String> x1=new TreeMap<String,String>();
    private String tokens[];
    public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split("\t");
            if(tokens[0].contains("m1"))
            {
                output1=tokens[1];
            }else if(tokens[0].contains("m2"))
            {
                output2=(tokens[1]);
            }
            x1.put(output2, output1);
        cleanup(context);

}
    public void cleanup(Context context)throws IOException,InterruptedException{

        for(Entry y:x1.entrySet())
        {
            context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
        }
    }
}

在驱动程序中 class 已包含以下行

MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);


我得到如下所示的输出,这完全不是我想要的。

1       m1      A
1       m2      ALPHA
2       m2      BRAVO
2       m1      B
3       m1      C
3       m2      CHARLIE
4       m2      DELTA
4       m1      D
5       m1      E
5       m2      ECHO

尽管我没有将索引包含在 context.write() 中,但我绝对无法弄清楚为什么要打印索引
我什至使用了 cleanup(),仍然得到相同的结果。 请建议如何获得上面给出的所需结果。

衷心感谢让我摆脱困境的人:)

稍后经过一些修改,我得到了这个输出

m1      E
m1      D
m1      C
m1      B
m1      A
m2      ECHO
m2      DELTA
m2      CHARLIE
m2      BRAVO
m2      ALPHA

reducer 方法应该将键和可迭代值作为参数。每个减速器都会有以下格式的数据

{1,{"m1 A","m2 ALPHA"}},{1,{"m2 BA","m2 BRAVO"}}。

请重新检查减速器方法的签名。我假设一旦解决了这个问题,并且如果您的数据是一对一的,您就可以进行相应的映射。如果是一对多,您可能有多个 m1 或 m2,为此,您需要决定如何管理多个值(映射保持逗号分隔或在 json 或 xml 字符串中) 然后输出最终值。

修改后的代码可能如下

 public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{

               for(Text value : values) {
                tokens=values.toString().split("\t");
                if(tokens[0].contains("m1"))
                {
                    output1=tokens[1];
                }else if(tokens[0].contains("m2"))
                {
                    output2=(tokens[1]);
                }
                x1.put(output2, output1);
               }
            cleanup(context);

    }