MapReduce:将 Reducer 的结果分组为固定大小的块
MapReduce: Group results of Reducer into chunks of fixed size
我正在使用 Map Reduce 框架。
假设这是输入列表[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P ,Q, R, S, T, U, V, W, X, Y, Z]
我的映射器产生以下输出:
<"Key 1" : A>
<"Key 2" : B>
<"Key 3" : C>
<"Key 1" : D>
<"Key 2" : E>
<"Key 3" : F>
<"Key 1" : G>
<"Key 2" : H>
<"Key 3" : I>
<"Key 1" : J>
<"Key 2" : K>
<"Key 3" : L>
<"Key 1" : M>
<"Key 2" : N>
<"Key 3" : O>
<"Key 1" : P>
<"Key 2" : Q>
<"Key 3" : R>
<"Key 1" : S>
<"Key 2" : T>
<"Key 3" : U>
<"Key 1" : V>
<"Key 2" : W>
<"Key 3" : X>
<"Key 1" : Y>
<"Key 2" : Z>
现在 Reducer 输出通常是这样的:
<"Key 1" : A, D, G, J, M, P, S, V, Y>
<"Key 2" : B, E, H, K, N, Q, T, W, Z>
<"Key 3" : C, F, I, L, O, R, U, X>
但是我想做的是这样的:
我想将每个键的输出组合成 3 个块,然后生成最终的 Reducer 输出。
所以我希望我的 Reducer 输出看起来像这样:
<"Key 1" : [A, D, G], [J, M, P], [S, V, Y]>
<"Key 2" : [B, E, H], [K, N, Q], [T, W, Z]>
<"Key 3" : [C, F, I], [L, O, R], [U, X]>
任何帮助将不胜感激,因为两天以来我一直被困在这个问题上。我无法弄清楚最后一部分,即如何将输出分组为 3 个块。
P.S. 如果块大小小于 3(就像在最后一个键的示例中一样)那么没问题,但不应超过 3。
是的,在您的情况下,您可以使用 ArrayWritable 作为缩减器值 class 将值写入固定大小的块。
你可以做的是,
维护一个固定大小为 3 的实例数组列表变量
减速器 class.
在您的 reduce() 中,遍历给定键的值列表,
并将其添加到数组列表中。
如果数组列表的大小达到3,那么就把它转换成
ArrayWritable 实例并将其传递给带有键的 write() 然后重置
数组列表。
在您的作业中将 outformat 值 class 声明为 ArrayWritable
配置
我觉得,做起来很简单:
- 在你的 reducer 中,每次只取 3 个值到一个 for 循环中。
将这三个与您选择的分隔符连接起来并写入上下文
context.write(键,值)
Please note that you can write to context as many time you wish, i.e.
for each chunk of 3 output simply write to context and then take next
set of 3 values.
如果您发现任何困难,请告诉我。
更复杂的解决方案可能是使用 MultiOutputs。您甚至可以使用它写入不同的文件。
一个很好的例子是 here 使用 hadoop 1.0.2
以下是取自 javadocs 的示例:
Usage in Reducer:
<K, V> String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}
public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}
public void cleanup(Context) throws IOException {
mos.close();
...
}
}
我正在使用 Map Reduce 框架。
假设这是输入列表[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P ,Q, R, S, T, U, V, W, X, Y, Z]
我的映射器产生以下输出:
<"Key 1" : A>
<"Key 2" : B>
<"Key 3" : C>
<"Key 1" : D>
<"Key 2" : E>
<"Key 3" : F>
<"Key 1" : G>
<"Key 2" : H>
<"Key 3" : I>
<"Key 1" : J>
<"Key 2" : K>
<"Key 3" : L>
<"Key 1" : M>
<"Key 2" : N>
<"Key 3" : O>
<"Key 1" : P>
<"Key 2" : Q>
<"Key 3" : R>
<"Key 1" : S>
<"Key 2" : T>
<"Key 3" : U>
<"Key 1" : V>
<"Key 2" : W>
<"Key 3" : X>
<"Key 1" : Y>
<"Key 2" : Z>
现在 Reducer 输出通常是这样的:
<"Key 1" : A, D, G, J, M, P, S, V, Y>
<"Key 2" : B, E, H, K, N, Q, T, W, Z>
<"Key 3" : C, F, I, L, O, R, U, X>
但是我想做的是这样的:
我想将每个键的输出组合成 3 个块,然后生成最终的 Reducer 输出。
所以我希望我的 Reducer 输出看起来像这样:
<"Key 1" : [A, D, G], [J, M, P], [S, V, Y]>
<"Key 2" : [B, E, H], [K, N, Q], [T, W, Z]>
<"Key 3" : [C, F, I], [L, O, R], [U, X]>
任何帮助将不胜感激,因为两天以来我一直被困在这个问题上。我无法弄清楚最后一部分,即如何将输出分组为 3 个块。
P.S. 如果块大小小于 3(就像在最后一个键的示例中一样)那么没问题,但不应超过 3。
是的,在您的情况下,您可以使用 ArrayWritable 作为缩减器值 class 将值写入固定大小的块。
你可以做的是,
维护一个固定大小为 3 的实例数组列表变量 减速器 class.
在您的 reduce() 中,遍历给定键的值列表, 并将其添加到数组列表中。
如果数组列表的大小达到3,那么就把它转换成
ArrayWritable 实例并将其传递给带有键的 write() 然后重置 数组列表。在您的作业中将 outformat 值 class 声明为 ArrayWritable
配置
我觉得,做起来很简单:
- 在你的 reducer 中,每次只取 3 个值到一个 for 循环中。
将这三个与您选择的分隔符连接起来并写入上下文
context.write(键,值)
Please note that you can write to context as many time you wish, i.e. for each chunk of 3 output simply write to context and then take next set of 3 values.
如果您发现任何困难,请告诉我。
更复杂的解决方案可能是使用 MultiOutputs。您甚至可以使用它写入不同的文件。
一个很好的例子是 here 使用 hadoop 1.0.2
以下是取自 javadocs 的示例:
Usage in Reducer:
<K, V> String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}
public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}
public void cleanup(Context) throws IOException {
mos.close();
...
}
}