Hadoop - 按前缀聚合
Hadoop - aggregating by prefix
我有带前缀的词。例如:
city|new york
city|London
travel|yes
...
city|new york
我想数一下有多少city|new york
和city|London
(这是经典的wordcount)。但是,reducer 输出应该是像 city:{"new york" :2, "london":1}
这样的键值对。对于每个 city
前缀的含义,我想聚合所有字符串及其计数。
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// Instead of just result count, I need something like {"city":{"new york" :2, "london":1}}
context.write(key, result);
}
有什么想法吗?
您可以使用reducer 的cleanup()
方法来实现这一点(假设您只有一个reducer)。它在 reduce 任务结束时调用一次。
我将针对 "city" 数据进行解释。
代码如下:
package com.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class Cities {
public static class CityMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
outKey.set(value);
context.write(outKey, outValue);
}
}
public static class CityReducer
extends Reducer<Text,IntWritable,Text,Text> {
HashMap<String, Integer> cityCount = new HashMap<String, Integer>();
public void reduce(Text key, Iterable<IntWritable>values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {
String keyStr = key.toString();
if(keyStr.toLowerCase().startsWith("city|")) {
String[] tokens = keyStr.split("\|");
if(cityCount.containsKey(tokens[1])) {
int count = cityCount.get(tokens[1]);
cityCount.put(tokens[1], ++count);
}
else
cityCount.put(tokens[1], val.get());
}
}
}
@Override
public void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException,
InterruptedException
{
String output = "{\"city\":{";
Iterator iterator = cityCount.entrySet().iterator();
while(iterator.hasNext())
{
Map.Entry entry = (Map.Entry) iterator.next();
output = output.concat("\"" + entry.getKey() + "\":" + Integer.toString((Integer) entry.getValue()) + ", ");
}
output = output.substring(0, output.length() - 2);
output = output.concat("}}");
context.write(output, "");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "KeyValue");
job.setJarByClass(Cities.class);
job.setMapperClass(CityMapper.class);
job.setReducerClass(CityReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/in/in.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
映射器:
- 它只是输出它遇到的每个键的计数。例如如果遇到记录"city|new york",那么它会输出(key, value) as ("city|new york", 1)
减速器:
- 对于每条记录,它检查键是否包含 "city|"。它拆分管道(“|”)上的密钥。并将每个城市的计数存储在 HashMap 中。
- Reducer 也覆盖了
cleanup
方法。一旦 reduce 任务结束,就会调用此方法。在此任务中,将 HashMap 的内容组合成所需的输出。
- 在
cleanup()
中,key作为HashMap的内容输出,value作为空字符串输出。
例如我将以下数据作为输入:
city|new york
city|London
city|new york
city|new york
city|Paris
city|Paris
我得到以下输出:
{"city":{"London":1, "new york":3, "Paris":2}}
很简单。
使用 "city" 作为输出键并使用整个记录作为输出值从映射器发出。
U 将在 reducer 中将城市划分为一个组,并作为另一个组旅行。
使用哈希映射来计算城市和旅行实例以细化到较低级别。
我有带前缀的词。例如:
city|new york
city|London
travel|yes
...
city|new york
我想数一下有多少city|new york
和city|London
(这是经典的wordcount)。但是,reducer 输出应该是像 city:{"new york" :2, "london":1}
这样的键值对。对于每个 city
前缀的含义,我想聚合所有字符串及其计数。
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
// Instead of just result count, I need something like {"city":{"new york" :2, "london":1}}
context.write(key, result);
}
有什么想法吗?
您可以使用reducer 的cleanup()
方法来实现这一点(假设您只有一个reducer)。它在 reduce 任务结束时调用一次。
我将针对 "city" 数据进行解释。
代码如下:
package com.hadooptests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class Cities {
public static class CityMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
outKey.set(value);
context.write(outKey, outValue);
}
}
public static class CityReducer
extends Reducer<Text,IntWritable,Text,Text> {
HashMap<String, Integer> cityCount = new HashMap<String, Integer>();
public void reduce(Text key, Iterable<IntWritable>values,
Context context
) throws IOException, InterruptedException {
for (IntWritable val : values) {
String keyStr = key.toString();
if(keyStr.toLowerCase().startsWith("city|")) {
String[] tokens = keyStr.split("\|");
if(cityCount.containsKey(tokens[1])) {
int count = cityCount.get(tokens[1]);
cityCount.put(tokens[1], ++count);
}
else
cityCount.put(tokens[1], val.get());
}
}
}
@Override
public void cleanup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException,
InterruptedException
{
String output = "{\"city\":{";
Iterator iterator = cityCount.entrySet().iterator();
while(iterator.hasNext())
{
Map.Entry entry = (Map.Entry) iterator.next();
output = output.concat("\"" + entry.getKey() + "\":" + Integer.toString((Integer) entry.getValue()) + ", ");
}
output = output.substring(0, output.length() - 2);
output = output.concat("}}");
context.write(output, "");
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "KeyValue");
job.setJarByClass(Cities.class);
job.setMapperClass(CityMapper.class);
job.setReducerClass(CityReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/in/in.txt"));
FileOutputFormat.setOutputPath(job, new Path("/out/"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
映射器:
- 它只是输出它遇到的每个键的计数。例如如果遇到记录"city|new york",那么它会输出(key, value) as ("city|new york", 1)
减速器:
- 对于每条记录,它检查键是否包含 "city|"。它拆分管道(“|”)上的密钥。并将每个城市的计数存储在 HashMap 中。
- Reducer 也覆盖了
cleanup
方法。一旦 reduce 任务结束,就会调用此方法。在此任务中,将 HashMap 的内容组合成所需的输出。 - 在
cleanup()
中,key作为HashMap的内容输出,value作为空字符串输出。
例如我将以下数据作为输入:
city|new york
city|London
city|new york
city|new york
city|Paris
city|Paris
我得到以下输出:
{"city":{"London":1, "new york":3, "Paris":2}}
很简单。
使用 "city" 作为输出键并使用整个记录作为输出值从映射器发出。
U 将在 reducer 中将城市划分为一个组,并作为另一个组旅行。
使用哈希映射来计算城市和旅行实例以细化到较低级别。