MapReduce:将 Reducer 的结果分组为固定大小的块

MapReduce: Group results of Reducer into chunks of fixed size

我正在使用 Map Reduce 框架。

假设这是输入列表[A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P ,Q, R, S, T, U, V, W, X, Y, Z] 我的映射器产生以下输出:

<"Key 1" : A>
<"Key 2" : B>
<"Key 3" : C>
<"Key 1" : D>
<"Key 2" : E>
<"Key 3" : F>
<"Key 1" : G>
<"Key 2" : H>
<"Key 3" : I>
<"Key 1" : J>
<"Key 2" : K>
<"Key 3" : L>
<"Key 1" : M>
<"Key 2" : N>
<"Key 3" : O>
<"Key 1" : P>
<"Key 2" : Q>
<"Key 3" : R>
<"Key 1" : S>
<"Key 2" : T>
<"Key 3" : U>
<"Key 1" : V>
<"Key 2" : W>
<"Key 3" : X>
<"Key 1" : Y>
<"Key 2" : Z>

现在 Reducer 输出通常是这样的:

<"Key 1" : A, D, G, J, M, P, S, V, Y>
<"Key 2" : B, E, H, K, N, Q, T, W, Z>
<"Key 3" : C, F, I, L, O, R, U, X>

但是我想做的是这样的:

我想将每个键的输出组合成 3 个块,然后生成最终的 Reducer 输出。

所以我希望我的 Reducer 输出看起来像这样:

<"Key 1" : [A, D, G], [J, M, P], [S, V, Y]>
<"Key 2" : [B, E, H], [K, N, Q], [T, W, Z]>
<"Key 3" : [C, F, I], [L, O, R], [U, X]>

任何帮助将不胜感激,因为两天以来我一直被困在这个问题上。我无法弄清楚最后一部分,即如何将输出分组为 3 个块。

P.S. 如果块大小小于 3(就像在最后一个键的示例中一样)那么没问题,但不应超过 3。

是的,在您的情况下,您可以使用 ArrayWritable 作为缩减器值 class 将值写入固定大小的块。

你可以做的是,

  • 维护一个固定大小为 3 的实例数组列表变量 减速器 class.

  • 在您的 reduce() 中,遍历给定键的值列表, 并将其添加到数组列表中。

  • 如果数组列表的大小达到3,那么就把它转换成
    ArrayWritable 实例并将其传递给带有键的 write() 然后重置 数组列表。

  • 在您的作业中将 outformat 值 class 声明为 ArrayWritable
    配置

我觉得,做起来很简单:

  1. 在你的 reducer 中,每次只取 3 个值到一个 for 循环中。
  2. 将这三个与您选择的分隔符连接起来并写入上下文

    context.write(键,值)

Please note that you can write to context as many time you wish, i.e. for each chunk of 3 output simply write to context and then take next set of 3 values.

如果您发现任何困难,请告诉我。

更复杂的解决方案可能是使用 MultiOutputs。您甚至可以使用它写入不同的文件。

一个很好的例子是 here 使用 hadoop 1.0.2

以下是取自 javadocs 的示例:

Usage in Reducer:

 <K, V> String generateFileName(K k, V v) {
   return k.toString() + "_" + v.toString();
 }

 public class MOReduce extends
   Reducer<WritableComparable, Writable,WritableComparable, Writable> {
 private MultipleOutputs mos;
 public void setup(Context context) {
 ...
 mos = new MultipleOutputs(context);
 }

 public void reduce(WritableComparable key, Iterator<Writable> values,
 Context context)
 throws IOException {
 ...
 mos.write("text", , key, new Text("Hello"));
 mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
 mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
 mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
 ...
 }

 public void cleanup(Context) throws IOException {
 mos.close();
 ...
 }

 }