Map/Reduce:完成后如何输出Hashmap?
Map/Reduce:How to output Hashmap after completion?
我想实现DPC算法(通过快速搜索和发现密度峰进行聚类)。这是一项艰巨的工作,所以我决定从 Rho 的计算开始。
地图在这里:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, two);
}
}
这里是 Reducer:
public void reduce(IntWritable key, IntWritable values, Context context)
throws IOException, InterruptedException {
int[] indexs = new int[2];
indexs[0] = Integer.parseInt(key.toString());
indexs[1] = Integer.parseInt(values.toString());
for (int i = 0; i < indexs.length; i++) {
densityCountMap.put(indexs[i],
densityCountMap.get(indexs[i]) + 1);
}
}
问题
densityCountMap是一个hash map,只有完成后才能正确。如何输出densityCountMap?以什么方式?
------------解决方案--------
感谢 mbaxi,你真的启发了我,因为你提到 reduce 定义不正确并且 densityCountMap 不是必需的。
我应该说得更清楚一点,目标是如果 lineSplit[2] 低于某个阈值,则 lineSplit[0] 和 lineSplit[1] 都会增加。其实不需要重写cleanup.
映射器:
public static class TokenizerMapper extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
private final static IntWritable count = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, count);// Both should be increased
context.write(two, count);// both as key
}
}
}
减速器:
public static class IntSumReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);//densityCountMap is redundant if having known better the structure of Map/reduce
context.write(key, result);//it equals to output densityCountMap
}
}
再次感谢,你带来的不仅仅是帮助,更是灵感。
您可以覆盖 cleanup(Context context) 方法,继续在 reduce() 方法中填充您的 densityCountMap 并flush/write在 cleanup(Context context) 方法中将内容写入磁盘。
cleanup() 在处理完所有行后调用。
---根据评论部分的要求进行编辑---
如果您使用的是 Eclipse 编辑器,请右键单击您正在扩展的 Reducer class,然后单击 Source->Override/Implement Methods,否则您可以查找 javadocs。
private static class RhoCalculationReducer extends Reducer<Text,Text,Text,Text> {
}
在那里你会看到以下方法的列表[请注意输入 parameters/datatypes 可能会根据你的 class 定义而改变] -
cleanup(Context)
reduce(Text, Iterable<Text>, Context)
run(Context)
setup(Context)
您的 reduce() 或 map() 函数实际上是重写的实现,您可以在其中提供自己的处理逻辑。
setup() 和 cleanup() 可以被认为分别类似于 map 或 reduce 任务的构造函数或析构函数。
setup() 在 reduce 任务映射开始之前调用,cleanup() 在任务结束时调用。
我还看到你的 reduce 定义不正确,而不是“IntWritable values”应该是“可迭代值",
对于 reducer,确保单个键的值由单个 reducer 处理,这就是签名采用键和可迭代值列表的原因。
可能您还想将来自单个键的记录聚合在一起,并且可能不需要额外的 densityCountMap,因为 reducer 已经负责一次性提取给定键的所有值。
我想实现DPC算法(通过快速搜索和发现密度峰进行聚类)。这是一项艰巨的工作,所以我决定从 Rho 的计算开始。
地图在这里:
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, two);
}
}
这里是 Reducer:
public void reduce(IntWritable key, IntWritable values, Context context)
throws IOException, InterruptedException {
int[] indexs = new int[2];
indexs[0] = Integer.parseInt(key.toString());
indexs[1] = Integer.parseInt(values.toString());
for (int i = 0; i < indexs.length; i++) {
densityCountMap.put(indexs[i],
densityCountMap.get(indexs[i]) + 1);
}
}
问题
densityCountMap是一个hash map,只有完成后才能正确。如何输出densityCountMap?以什么方式?
------------解决方案--------
感谢 mbaxi,你真的启发了我,因为你提到 reduce 定义不正确并且 densityCountMap 不是必需的。
我应该说得更清楚一点,目标是如果 lineSplit[2] 低于某个阈值,则 lineSplit[0] 和 lineSplit[1] 都会增加。其实不需要重写cleanup.
映射器:
public static class TokenizerMapper extends
Mapper<LongWritable, Text, IntWritable, IntWritable> {
private final static IntWritable count = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] lineSplit = line.split(" ");
if (Double.parseDouble(lineSplit[2]) < dcThreshold) {
IntWritable one = new IntWritable(
Integer.parseInt(lineSplit[0]));
IntWritable two = new IntWritable(
Integer.parseInt(lineSplit[1]));
context.write(one, count);// Both should be increased
context.write(two, count);// both as key
}
}
}
减速器:
public static class IntSumReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);//densityCountMap is redundant if having known better the structure of Map/reduce
context.write(key, result);//it equals to output densityCountMap
}
}
再次感谢,你带来的不仅仅是帮助,更是灵感。
您可以覆盖 cleanup(Context context) 方法,继续在 reduce() 方法中填充您的 densityCountMap 并flush/write在 cleanup(Context context) 方法中将内容写入磁盘。
cleanup() 在处理完所有行后调用。
---根据评论部分的要求进行编辑---
如果您使用的是 Eclipse 编辑器,请右键单击您正在扩展的 Reducer class,然后单击 Source->Override/Implement Methods,否则您可以查找 javadocs。
private static class RhoCalculationReducer extends Reducer<Text,Text,Text,Text> {
}
在那里你会看到以下方法的列表[请注意输入 parameters/datatypes 可能会根据你的 class 定义而改变] -
cleanup(Context)
reduce(Text, Iterable<Text>, Context)
run(Context)
setup(Context)
您的 reduce() 或 map() 函数实际上是重写的实现,您可以在其中提供自己的处理逻辑。 setup() 和 cleanup() 可以被认为分别类似于 map 或 reduce 任务的构造函数或析构函数。 setup() 在 reduce 任务映射开始之前调用,cleanup() 在任务结束时调用。
我还看到你的 reduce 定义不正确,而不是“IntWritable values”应该是“可迭代值", 对于 reducer,确保单个键的值由单个 reducer 处理,这就是签名采用键和可迭代值列表的原因。 可能您还想将来自单个键的记录聚合在一起,并且可能不需要额外的 densityCountMap,因为 reducer 已经负责一次性提取给定键的所有值。