卡在 Reduce Join 代码中
Stuck in the Reduce Join Code
我有两个数据集。两者都在下面给出
第一个数据集
1 A
2 B
3 C
4 D
5 E
第二个数据集
1 ALPHA
2 BRAVO
3 CHARLIE
4 DELTA
5 ECHO
我想使用reduce side join
加入这个数据集
最终数据应该是这样的
A ALPHA
B BRAVO
C CHARLIE
D DELTA
E ECHO
我写了下面的代码
Mapper(从第一个数据集中提取数据)
public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
}
}
Mapper(从第二个数据集中提取数据)
public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
}
}
Reducer(根据需要加入数据)
public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
private String output1=new String();
private String output2=new String();
private TreeMap<String,String> x1=new TreeMap<String,String>();
private String tokens[];
public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
cleanup(context);
}
public void cleanup(Context context)throws IOException,InterruptedException{
for(Entry y:x1.entrySet())
{
context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
}
}
}
在驱动程序中 class 已包含以下行
MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);
我得到如下所示的输出,这完全不是我想要的。
1 m1 A
1 m2 ALPHA
2 m2 BRAVO
2 m1 B
3 m1 C
3 m2 CHARLIE
4 m2 DELTA
4 m1 D
5 m1 E
5 m2 ECHO
尽管我没有将索引包含在 context.write()
中,但我绝对无法弄清楚为什么要打印索引
我什至使用了 cleanup(),仍然得到相同的结果。
请建议如何获得上面给出的所需结果。
衷心感谢让我摆脱困境的人:)
稍后经过一些修改,我得到了这个输出
m1 E
m1 D
m1 C
m1 B
m1 A
m2 ECHO
m2 DELTA
m2 CHARLIE
m2 BRAVO
m2 ALPHA
reducer 方法应该将键和可迭代值作为参数。每个减速器都会有以下格式的数据
{1,{"m1 A","m2 ALPHA"}},{1,{"m2 BA","m2 BRAVO"}}。
请重新检查减速器方法的签名。我假设一旦解决了这个问题,并且如果您的数据是一对一的,您就可以进行相应的映射。如果是一对多,您可能有多个 m1 或 m2,为此,您需要决定如何管理多个值(映射保持逗号分隔或在 json 或 xml 字符串中) 然后输出最终值。
修改后的代码可能如下
public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{
for(Text value : values) {
tokens=values.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
}
cleanup(context);
}
我有两个数据集。两者都在下面给出
第一个数据集
1 A
2 B
3 C
4 D
5 E
第二个数据集
1 ALPHA
2 BRAVO
3 CHARLIE
4 DELTA
5 ECHO
我想使用reduce side join
加入这个数据集
最终数据应该是这样的
A ALPHA
B BRAVO
C CHARLIE
D DELTA
E ECHO
我写了下面的代码
Mapper(从第一个数据集中提取数据)
public class indMapper extends Mapper<Object, Text,IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m1"+"\t"+tokens[1].trim()));
}
}
Mapper(从第二个数据集中提取数据)
public class AlphaMapper extends Mapper<Object, Text, IntWritable, Text> {
private String tokens[];
public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
context.write(new IntWritable(Integer.parseInt(tokens[0].toString().trim())), new Text("m2"+"\t"+tokens[1].trim()));
}
}
Reducer(根据需要加入数据)
public class JoinReducer extends Reducer<IntWritable, Text, Text, Text> {
private String output1=new String();
private String output2=new String();
private TreeMap<String,String> x1=new TreeMap<String,String>();
private String tokens[];
public void reduce(IntWritable key,Text value,Context context)throws IOException,InterruptedException{
tokens=value.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
cleanup(context);
}
public void cleanup(Context context)throws IOException,InterruptedException{
for(Entry y:x1.entrySet())
{
context.write(new Text(" "), new Text(y.getKey().toString()+","+y.getValue().toString()));
}
}
}
在驱动程序中 class 已包含以下行
MultipleInputs.addInputPath(j, new Path(arg0[0]),TextInputFormat.class,indMapper.class);
MultipleInputs.addInputPath(j, new Path(arg0[1]),TextInputFormat.class,AlphaMapper.class);
我得到如下所示的输出,这完全不是我想要的。
1 m1 A
1 m2 ALPHA
2 m2 BRAVO
2 m1 B
3 m1 C
3 m2 CHARLIE
4 m2 DELTA
4 m1 D
5 m1 E
5 m2 ECHO
尽管我没有将索引包含在 context.write()
中,但我绝对无法弄清楚为什么要打印索引
我什至使用了 cleanup(),仍然得到相同的结果。
请建议如何获得上面给出的所需结果。
衷心感谢让我摆脱困境的人:)
稍后经过一些修改,我得到了这个输出
m1 E
m1 D
m1 C
m1 B
m1 A
m2 ECHO
m2 DELTA
m2 CHARLIE
m2 BRAVO
m2 ALPHA
reducer 方法应该将键和可迭代值作为参数。每个减速器都会有以下格式的数据
{1,{"m1 A","m2 ALPHA"}},{1,{"m2 BA","m2 BRAVO"}}。
请重新检查减速器方法的签名。我假设一旦解决了这个问题,并且如果您的数据是一对一的,您就可以进行相应的映射。如果是一对多,您可能有多个 m1 或 m2,为此,您需要决定如何管理多个值(映射保持逗号分隔或在 json 或 xml 字符串中) 然后输出最终值。
修改后的代码可能如下
public void reduce(IntWritable key,Iterabale<Text> values,Context context)throws IOException,InterruptedException{
for(Text value : values) {
tokens=values.toString().split("\t");
if(tokens[0].contains("m1"))
{
output1=tokens[1];
}else if(tokens[0].contains("m2"))
{
output2=(tokens[1]);
}
x1.put(output2, output1);
}
cleanup(context);
}