使用 Map Reduce 连接多个文件
joining of multiple files using Map Reduce
加入2个文件的内容:
第一个文件(包含员工姓名数据)
id,name
101,Gaurav
102,Rohit
103,Karishma
104,Darshan
105,Divya
第二个文件(包含员工部门数据)
id,dept
101,Sales
102,Research
103,NMG
104,Admin
105,HR
============================
输出
id,name,dept
101,Gaurav,Sales
102,Rohit,Research
103,Karishma,NMG
如何实现这种输出?
截至目前,我在减速器中以随机值的形式输出,例如..
我想要按指定的顺序输出,例如 id、name、department。
任何帮助表示赞赏。
映射器 class 看起来像这样...
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text keyEmit = new Text();
private Text valEmit = new Text();
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
String line=value.toString();
String[] words=line.split(",");
keyEmit.set(words[0]);
valEmit.set(words[1]);
context.write(keyEmit, valEmit);
}
}
Reducer class 看起来像这样...
public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
merge = key.toString(); // 101
for(Text value : values) {
merge += "," + value.toString();
}
context.write(NullWritable.get(), new Text(merge));
}
}
Driver class 看起来像这样...
public class JoinDriver {
public final static void main(final String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Multiple join");
job.setJarByClass(JoinDriver.class);
// job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, JoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
目前的输出如下,我希望它像 id、name、department 一样按顺序排列。
您遇到的主要问题是值未排序,因此您根据公共键进行分组,但仅将值作为字符串发送并没有太大帮助,因为您不知道哪个是名称,哪个是部门。
您有几个选项,所有选项都需要从映射器发送更多信息:
- 使用辅助排序
- 对 reducer 中的值进行排序
最快的方法是在映射器中输出值时将更多信息附加到该值(理想情况下,您实际上会使用包含两个文本对象的复合值)。
public class JoinMapperName extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable k, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(",");
context.write(new Text(words[0]), new Text("name:" + words[1]));
}
}
public class JoinMapperDept extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable k, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(",");
context.write(new Text(words[0]), new Text("dept:" + words[1]));
}
}
现在每个数据源都有一个不同的映射器。您需要将减速器更改为:
public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String name = "";
String dept = "";
for(Text value : values) {
if (value.toString().startsWith("name")) {
name = value.toString().split(":")[1];
} else {
dept = value.toString().split(":")[1];
}
}
String merge = key + "," name + "," + dept;
context.write(NullWritable.get(), new Text(merge));
}
}
这只是一个简单的示例,说明您如何做到这一点。希望它能给你一些关于如何强制排序的想法。
加入2个文件的内容:
第一个文件(包含员工姓名数据)
id,name
101,Gaurav
102,Rohit
103,Karishma
104,Darshan
105,Divya
第二个文件(包含员工部门数据)
id,dept
101,Sales
102,Research
103,NMG
104,Admin
105,HR
============================
输出
id,name,dept
101,Gaurav,Sales
102,Rohit,Research
103,Karishma,NMG
如何实现这种输出?
截至目前,我在减速器中以随机值的形式输出,例如..
我想要按指定的顺序输出,例如 id、name、department。 任何帮助表示赞赏。
映射器 class 看起来像这样...
public class JoinMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text keyEmit = new Text();
private Text valEmit = new Text();
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
String line=value.toString();
String[] words=line.split(",");
keyEmit.set(words[0]);
valEmit.set(words[1]);
context.write(keyEmit, valEmit);
}
}
Reducer class 看起来像这样...
public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
merge = key.toString(); // 101
for(Text value : values) {
merge += "," + value.toString();
}
context.write(NullWritable.get(), new Text(merge));
}
}
Driver class 看起来像这样...
public class JoinDriver {
public final static void main(final String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Multiple join");
job.setJarByClass(JoinDriver.class);
// job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, JoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
目前的输出如下,我希望它像 id、name、department 一样按顺序排列。
您遇到的主要问题是值未排序,因此您根据公共键进行分组,但仅将值作为字符串发送并没有太大帮助,因为您不知道哪个是名称,哪个是部门。
您有几个选项,所有选项都需要从映射器发送更多信息:
- 使用辅助排序
- 对 reducer 中的值进行排序
最快的方法是在映射器中输出值时将更多信息附加到该值(理想情况下,您实际上会使用包含两个文本对象的复合值)。
public class JoinMapperName extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable k, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(",");
context.write(new Text(words[0]), new Text("name:" + words[1]));
}
}
public class JoinMapperDept extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable k, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(",");
context.write(new Text(words[0]), new Text("dept:" + words[1]));
}
}
现在每个数据源都有一个不同的映射器。您需要将减速器更改为:
public class JoinReducer extends Reducer<Text, Text, NullWritable, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String name = "";
String dept = "";
for(Text value : values) {
if (value.toString().startsWith("name")) {
name = value.toString().split(":")[1];
} else {
dept = value.toString().split(":")[1];
}
}
String merge = key + "," name + "," + dept;
context.write(NullWritable.get(), new Text(merge));
}
}
这只是一个简单的示例,说明您如何做到这一点。希望它能给你一些关于如何强制排序的想法。