无法识别我的 Reducer 连接代码中的错误
Unable to indentify the bug in my Reducer join code
我有两个数据集:
用户:
Bobby 06 Amsterdam
Sunny 07 Rotterdam
Steven 08 Liverpool
Jamie 23 Liverpool
Macca 91 Liverpool
Messi 10 Barcelona
Pique 04 Barcelona
Suarez 09 Barcelona
Neymar 11 brazil
Klopp 12 Liverpool
用户日志:
Sunny NewPlayer 12.23.14.421
Klopp Crazy 88.33.44.555
Bobby NewPlayer 99.12.11.222
Steven Captain 99.55.66.777
Jamie Local 88.99.33.232
Suarez Spain 77.55.66.444
我想使用 reducer join 来连接这两个数据集。
我这样写我的类:
映射器Class:
Public class MapperClass {
public static class UserMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(" ");
String name = tokens[0];
String city = tokens[2];
context.write(new Text(name), new Text("UserFile" + "\t" + city));
}
}
public static class UserLogs extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(" ");
String name = tokens[0];
String ip = tokens[2];
context.write(new Text(name), new Text("UserLogs" + "\t" + ip));
}
}
}
减速机Class:
public class ReducerClass extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String city = null;
String ip = null;
for(Text t: values) {
String[] parts = t.toString().split("\t");
if(parts[0].equals("UserFile")) {
city = parts[1];
}
if(parts[0].equals("UserLogs")) {
ip = parts[1];
} else {
ip = "IP Address not found";
}
}
context.write(key, new Text(city + "\t" + ip));
}
}
司机类:
public class MainClass {
public static void main(String[] args)throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job();
job.setJarByClass(MainClass.class);
job.setOutputKeyClass(Text.class);
job.setReducerClass(ReducerClass.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, UserLogs.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
输出应该是这样的:
Bobby Amsterdam 99.12.11.222
Sunny Rotterdam 12.23.14.421
Klopp Liverpool 88.33.44.555
Steven Liverpool 99.55.66.777
Jamie Liverpool 88.99.33.232
Suarez Barcelona 77.55.66.444
相反,我得到这样的输出:
Bobby Amsterdam IP Address not found
Jamie Liverpool 88.99.33.232
Klopp Liverpool IP Address not found
Macca Liverpool IP Address not found
Messi Barcelona IP Address not found
Neymar brazil IP Address not found
Pique Barcelona IP Address not found
Steven Liverpool 99.55.66.777
Suarez Barcelona IP Address not found
Sunny Rotterdam 12.23.14.421
我不明白我在这里犯了什么错误。
谁能帮我解决这个问题。
非常感谢任何形式的帮助。
你的 reducer 有一个错误,它根据 values
顺序覆盖了 IP 地址。试试这个:
public class ReducerClass extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String city = null;
String ip = null;
for(Text t: values) {
String[] parts = t.toString().split("\t");
if(parts[0].equals("UserFile")) {
city = parts[1];
} else if(parts[0].equals("UserLogs")) {
ip = parts[1];
}
}
if (ip != null && city != null) {
context.write(key, new Text(city + "\t" + ip));
}
}
}
我有两个数据集:
用户:
Bobby 06 Amsterdam
Sunny 07 Rotterdam
Steven 08 Liverpool
Jamie 23 Liverpool
Macca 91 Liverpool
Messi 10 Barcelona
Pique 04 Barcelona
Suarez 09 Barcelona
Neymar 11 brazil
Klopp 12 Liverpool
用户日志:
Sunny NewPlayer 12.23.14.421
Klopp Crazy 88.33.44.555
Bobby NewPlayer 99.12.11.222
Steven Captain 99.55.66.777
Jamie Local 88.99.33.232
Suarez Spain 77.55.66.444
我想使用 reducer join 来连接这两个数据集。 我这样写我的类:
映射器Class:
Public class MapperClass {
public static class UserMap extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(" ");
String name = tokens[0];
String city = tokens[2];
context.write(new Text(name), new Text("UserFile" + "\t" + city));
}
}
public static class UserLogs extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(" ");
String name = tokens[0];
String ip = tokens[2];
context.write(new Text(name), new Text("UserLogs" + "\t" + ip));
}
}
}
减速机Class:
public class ReducerClass extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String city = null;
String ip = null;
for(Text t: values) {
String[] parts = t.toString().split("\t");
if(parts[0].equals("UserFile")) {
city = parts[1];
}
if(parts[0].equals("UserLogs")) {
ip = parts[1];
} else {
ip = "IP Address not found";
}
}
context.write(key, new Text(city + "\t" + ip));
}
}
司机类:
public class MainClass {
public static void main(String[] args)throws IOException, InterruptedException, ClassNotFoundException {
Job job = new Job();
job.setJarByClass(MainClass.class);
job.setOutputKeyClass(Text.class);
job.setReducerClass(ReducerClass.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMap.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, UserLogs.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
输出应该是这样的:
Bobby Amsterdam 99.12.11.222
Sunny Rotterdam 12.23.14.421
Klopp Liverpool 88.33.44.555
Steven Liverpool 99.55.66.777
Jamie Liverpool 88.99.33.232
Suarez Barcelona 77.55.66.444
相反,我得到这样的输出:
Bobby Amsterdam IP Address not found
Jamie Liverpool 88.99.33.232
Klopp Liverpool IP Address not found
Macca Liverpool IP Address not found
Messi Barcelona IP Address not found
Neymar brazil IP Address not found
Pique Barcelona IP Address not found
Steven Liverpool 99.55.66.777
Suarez Barcelona IP Address not found
Sunny Rotterdam 12.23.14.421
我不明白我在这里犯了什么错误。 谁能帮我解决这个问题。 非常感谢任何形式的帮助。
你的 reducer 有一个错误,它根据 values
顺序覆盖了 IP 地址。试试这个:
public class ReducerClass extends Reducer<Text, Text, Text, Text>{
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String city = null;
String ip = null;
for(Text t: values) {
String[] parts = t.toString().split("\t");
if(parts[0].equals("UserFile")) {
city = parts[1];
} else if(parts[0].equals("UserLogs")) {
ip = parts[1];
}
}
if (ip != null && city != null) {
context.write(key, new Text(city + "\t" + ip));
}
}
}