Java Hadoop MapReduce 链接作业
Java Hadoop MapReduce Chaining Job
我有一些代码可以正确选择源和最高权重。我似乎也无法拉入目标列。有人能指出我正确的方向吗?我以前从未使用过 java。我认为 reducer 函数需要 return 一个元组。因此映射器函数中的变量目标是否需要有这个元组?
期望的输出:每行包含一个节点 ID,后跟一个制表符 (\t) 和预期的“tgt ,weight”元组。元组是具有最高权重的 tgt。如果平局,return 数字最小的 tgt。
输入
src tgt weight
1 110 3
1 200 1
20 150 30
10 110 10
11 130 15
11 200 67
1 70 3
预期输出
1 70,3
20 150,30
10 110,10
11 200,67
CURRENT OUTPUT(需要在 tgt 列中添加为元组)
1 3
20 30
10 10
11 67
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Q1 {
public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text target = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "\r");
while (st.hasMoreTokens()) {
String[] edge = st.nextToken().split("\t");
target.set(edge[0]);
context.write(target, new IntWritable(Integer.parseInt(edge[2])));
}
}
}
public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable totalCount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{
int max = 0;
for (IntWritable target : targets) {
if(target.get() > max || max ==0) {
max = target.get();
}
}
totalCount.set(max);
context.write(key, totalCount);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Q1");
job.setJarByClass(Q1.class);
job.setMapperClass(TargetMapper.class);
job.setCombinerClass(EmailsReducer.class);
job.setReducerClass(EmailsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
您对自定义输出感兴趣。为此,请尝试实施自定义 WritableComparable
。您可能需要更新您的逻辑以使其根据您的需要工作。
类似于:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class MyWritable implements WritableComparable<MyWritable> {
private IntWritable tgt;
private IntWritable weight;
public MyWritable() {
set(new IntWritable(), new IntWritable());
}
public MyWritable(int tgt, int weight) {
set(new IntWritable(tgt), new IntWritable(weight));
}
public MyWritable(IntWritable tgt, IntWritable weight) {
set(tgt, weight);
}
public IntWritable getTgt() {
return tgt;
}
public IntWritable getWeight() {
return weight;
}
public void set(IntWritable tgt, IntWritable weight) {
this.tgt = tgt;
this.weight = weight;
}
@Override
public int compareTo(MyWritable o) {
int cmp = tgt.compareTo(o.tgt);
if (cmp == 0) {
return weight.compareTo(o.weight);
}
return cmp;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
tgt.write(dataOutput);
weight.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
tgt.readFields(dataInput);
weight.readFields(dataInput);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyWritable that = (MyWritable) o;
return Objects.equals(tgt, that.tgt) &&
Objects.equals(weight, that.weight);
}
@Override
public int hashCode() {
return Objects.hash(tgt, weight);
}
}
并更新您的代码以将其用作 Mapper 和 Reducer 中的值。喜欢:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class Q1 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Q1");
job.setJarByClass(Q1.class);
job.setMapperClass(TargetMapper.class);
job.setCombinerClass(EmailsReducer.class);
job.setReducerClass(EmailsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyWritable.class);
job.setMapOutputValueClass(MyWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TargetMapper extends Mapper<Object, Text, Text, MyWritable> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "\r");
while (st.hasMoreTokens()) {
String[] edge = st.nextToken().split("\t");
Text target = new Text();
target.set(edge[0]);
int tgt = Integer.parseInt(edge[1]);
int weight = Integer.parseInt(edge[2]);
context.write(target, new MyWritable(tgt, weight));
}
}
}
public static class EmailsReducer extends Reducer<Text, MyWritable, Text, MyWritable> {
private MyWritable res = new MyWritable();
public void reduce(Text key, Iterable<MyWritable> targets, Context context) throws IOException, InterruptedException {
int maxWeight = Integer.MIN_VALUE;
int maxTgt = Integer.MIN_VALUE;
for (MyWritable target : targets) {
if (target.getWeight().get() > maxWeight) {
maxWeight = target.getWeight().get();
maxTgt = target.getTgt().get();
}
}
res.set(new IntWritable(maxTgt), new IntWritable(maxWeight));
context.write(key, res);
}
}
}
我有一些代码可以正确选择源和最高权重。我似乎也无法拉入目标列。有人能指出我正确的方向吗?我以前从未使用过 java。我认为 reducer 函数需要 return 一个元组。因此映射器函数中的变量目标是否需要有这个元组?
期望的输出:每行包含一个节点 ID,后跟一个制表符 (\t) 和预期的“tgt ,weight”元组。元组是具有最高权重的 tgt。如果平局,return 数字最小的 tgt。
输入
src tgt weight
1 110 3
1 200 1
20 150 30
10 110 10
11 130 15
11 200 67
1 70 3
预期输出
1 70,3
20 150,30
10 110,10
11 200,67
CURRENT OUTPUT(需要在 tgt 列中添加为元组)
1 3
20 30
10 10
11 67
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Q1 {
public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text target = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "\r");
while (st.hasMoreTokens()) {
String[] edge = st.nextToken().split("\t");
target.set(edge[0]);
context.write(target, new IntWritable(Integer.parseInt(edge[2])));
}
}
}
public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable totalCount = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{
int max = 0;
for (IntWritable target : targets) {
if(target.get() > max || max ==0) {
max = target.get();
}
}
totalCount.set(max);
context.write(key, totalCount);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Q1");
job.setJarByClass(Q1.class);
job.setMapperClass(TargetMapper.class);
job.setCombinerClass(EmailsReducer.class);
job.setReducerClass(EmailsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
您对自定义输出感兴趣。为此,请尝试实施自定义 WritableComparable
。您可能需要更新您的逻辑以使其根据您的需要工作。
类似于:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class MyWritable implements WritableComparable<MyWritable> {
private IntWritable tgt;
private IntWritable weight;
public MyWritable() {
set(new IntWritable(), new IntWritable());
}
public MyWritable(int tgt, int weight) {
set(new IntWritable(tgt), new IntWritable(weight));
}
public MyWritable(IntWritable tgt, IntWritable weight) {
set(tgt, weight);
}
public IntWritable getTgt() {
return tgt;
}
public IntWritable getWeight() {
return weight;
}
public void set(IntWritable tgt, IntWritable weight) {
this.tgt = tgt;
this.weight = weight;
}
@Override
public int compareTo(MyWritable o) {
int cmp = tgt.compareTo(o.tgt);
if (cmp == 0) {
return weight.compareTo(o.weight);
}
return cmp;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
tgt.write(dataOutput);
weight.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
tgt.readFields(dataInput);
weight.readFields(dataInput);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyWritable that = (MyWritable) o;
return Objects.equals(tgt, that.tgt) &&
Objects.equals(weight, that.weight);
}
@Override
public int hashCode() {
return Objects.hash(tgt, weight);
}
}
并更新您的代码以将其用作 Mapper 和 Reducer 中的值。喜欢:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class Q1 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Q1");
job.setJarByClass(Q1.class);
job.setMapperClass(TargetMapper.class);
job.setCombinerClass(EmailsReducer.class);
job.setReducerClass(EmailsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MyWritable.class);
job.setMapOutputValueClass(MyWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TargetMapper extends Mapper<Object, Text, Text, MyWritable> {
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString(), "\r");
while (st.hasMoreTokens()) {
String[] edge = st.nextToken().split("\t");
Text target = new Text();
target.set(edge[0]);
int tgt = Integer.parseInt(edge[1]);
int weight = Integer.parseInt(edge[2]);
context.write(target, new MyWritable(tgt, weight));
}
}
}
public static class EmailsReducer extends Reducer<Text, MyWritable, Text, MyWritable> {
private MyWritable res = new MyWritable();
public void reduce(Text key, Iterable<MyWritable> targets, Context context) throws IOException, InterruptedException {
int maxWeight = Integer.MIN_VALUE;
int maxTgt = Integer.MIN_VALUE;
for (MyWritable target : targets) {
if (target.getWeight().get() > maxWeight) {
maxWeight = target.getWeight().get();
maxTgt = target.getTgt().get();
}
}
res.set(new IntWritable(maxTgt), new IntWritable(maxWeight));
context.write(key, res);
}
}
}