使用 java 计算入度和出度并显示总和的 Mapreduce 函数
Mapreduce function to calculate in degree and out degree and show sum using java
我正在尝试对一组数据的入度和出度求和。
这是示例数据:
Source Target
1 2
2 1
3 1
2 3
所以预期的输出是:
ID In degree Out degree
1 2 1
2 1 2
3 1 1
如何使用 mapreduce 实现这一点 Java 并在一行中打印出结果。
一个选项涉及一个 MR 作业:
假设原始数据集看起来像 [node1,node2]
:
-mapper 读取原始数据集并为每一行发出三元组 [node1,out]
和 [node2,in]
-reducer 以 [key,label]
的形式从 mapper 获取三元组,通过每个键分别计算 "out" 标签和 "in" 标签来计算出度和入度,并以 [key, indegree, outdegree]
实现看起来类似于下面(假设数据集中的 node1
和 node2
由 space 分隔,并且还假设数据集仅包含不同的对):
映射器:
public class YourMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] line_spl = line.split(" ");
String node1 = line_spl[0];
String node2 = line_spl[1];
Text node1_txt = new Text(node1);
Text node2_txt = new Text(node2);
Text emit_out = new Text("out");
Text emit_in = new Text("in");
output.collect(node1_txt, emit_out);
output.collect(node2_txt, emit_in );
}//end map function
}//end mapper class
减速器:
public class YourReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int count_outs = 0;
int count_ins = 0;
while (values.hasNext()) {
Text value = (Text) values.next();
String value_str = value.toString();
if(value_str.equals("out"))
count_outs++;
else
if(value_str.equals("in"))
count_ins++;
}
Text out = new Text(count_ins + " " + count_outs);
output.collect(key, out);
}//end reduce function
}//end reducer class
我正在尝试对一组数据的入度和出度求和。 这是示例数据:
Source Target
1 2
2 1
3 1
2 3
所以预期的输出是:
ID In degree Out degree
1 2 1
2 1 2
3 1 1
如何使用 mapreduce 实现这一点 Java 并在一行中打印出结果。
一个选项涉及一个 MR 作业:
假设原始数据集看起来像 [node1,node2]
:
-mapper 读取原始数据集并为每一行发出三元组 [node1,out]
和 [node2,in]
-reducer 以 [key,label]
的形式从 mapper 获取三元组,通过每个键分别计算 "out" 标签和 "in" 标签来计算出度和入度,并以 [key, indegree, outdegree]
实现看起来类似于下面(假设数据集中的 node1
和 node2
由 space 分隔,并且还假设数据集仅包含不同的对):
映射器:
public class YourMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] line_spl = line.split(" ");
String node1 = line_spl[0];
String node2 = line_spl[1];
Text node1_txt = new Text(node1);
Text node2_txt = new Text(node2);
Text emit_out = new Text("out");
Text emit_in = new Text("in");
output.collect(node1_txt, emit_out);
output.collect(node2_txt, emit_in );
}//end map function
}//end mapper class
减速器:
public class YourReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
int count_outs = 0;
int count_ins = 0;
while (values.hasNext()) {
Text value = (Text) values.next();
String value_str = value.toString();
if(value_str.equals("out"))
count_outs++;
else
if(value_str.equals("in"))
count_ins++;
}
Text out = new Text(count_ins + " " + count_outs);
output.collect(key, out);
}//end reduce function
}//end reducer class