使用 CombineFn 从所有节点累积数据后,合并每个键的所有值
Combine all values per key after accumulating data from all nodes using CombineFn
我想在每个键的基础上迭代 KV pCollection 的值。我使用下面的代码使用自定义 class,
进行组合
PCollection<KV<String, String>> combinesAttributes =
valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
new CombineAttributes()));
下面是我的自定义组合 class,
public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
@Override
public String apply(Iterable<String> input) {...}..}
这对于小输入来说工作得很好,但对于大输入来说,组合并不像预期的那样。输出只组合了一个键的几个值,其他值都丢失了。我假设输出只有来自一个节点的组合数据。
https://cloud.google.com/dataflow/model/combine 中的文档提到使用 CombineFn 来组合所有节点中每个键的完整值集合。
但是当我如下更改自定义组合函数时,出现以下错误,
incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
组合函数
public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {
public static class Accum {
List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
for (String item : input) {
accum.inputList.add(item);
}
return accum;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
for (String item : accum.inputList) {
merged.inputList.add(item);
}
}
return merged;
}
public String extractOutput(Accum accum) {
return "";
}
}
没有可用于扩展 CombineFn
的 combine perKey 的示例代码。请让我知道上面的代码有什么问题。
如果您只想遍历所有值,您可以使用 GroupByKey
将 PCollection<KV<K, V>>
变成 PCollection<KV<K, Iterable<V>>
。然后你可以编写一个 DoFn
来处理其中的每个元素,并在内部迭代 Iterable<V>
.
请注意,您只会收到与同一 window 中的键关联的所有值。如果您使用默认的全局 window,那将是所有值。
Combine
和 CombineFn
在您想要将所有值组合成较小的输出时最有用。例如,如果您想求所有值的总和或平均值,使用 Sum.perKey()
或 Mean.perKey()
会更有效。效率来自能够传递(和合并)累加器。在 Sum
的情况下,这对应于部分和。
举个例子,管道 运行s 在两台机器上。第一台机器处理 KV<user1, attr1a>, KV<user1, attr1b>, KV<user2, attr2a>
,第二台机器处理 KV<user1, attr1c>, KV<user2, attr2b>
.
CombineAttributes
(无论是哪种实现方式)将首先在每台机器上调用。所以它可以将 [attr1a, attr1b]
组合成一个字符串或累加器(比如 attr1a+attr1b
)。然后它会在另一台机器上 运行 将 [attr1c]
组合成 attr1c
。然后它将合并所有这些部分结果以获得最终累加器 -- attr1a+attr1b+attr1c
。在最初的实施情况下,这将是最终答案。在后者中,将在此累加器上调用 extractOutput
。
我想在每个键的基础上迭代 KV pCollection 的值。我使用下面的代码使用自定义 class,
进行组合PCollection<KV<String, String>> combinesAttributes =
valExtract.get(extAttUsers).apply(Combine.<String, String>perKey(
new CombineAttributes()));
下面是我的自定义组合 class,
public static class CombineAttributes implements SerializableFunction<Iterable<String>, String> {
@Override
public String apply(Iterable<String> input) {...}..}
这对于小输入来说工作得很好,但对于大输入来说,组合并不像预期的那样。输出只组合了一个键的几个值,其他值都丢失了。我假设输出只有来自一个节点的组合数据。
https://cloud.google.com/dataflow/model/combine 中的文档提到使用 CombineFn 来组合所有节点中每个键的完整值集合。
但是当我如下更改自定义组合函数时,出现以下错误,
incompatible types: CombineAttributes cannot be converted to com.google.cloud.dataflow.sdk.transforms.SerializableFunction<java.lang.Iterable<java.lang.String>,java.lang.String>
组合函数
public static class CombineAttributes extends CombineFn<Iterable<String>, CombineAttributes.Accum, String> {
public static class Accum {
List<String> inputList = new ArrayList<String>();
}
public Accum createAccumulator() { return new Accum(); }
public Accum addInput(Accum accum, Iterable<String> input) {
for (String item : input) {
accum.inputList.add(item);
}
return accum;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
for (String item : accum.inputList) {
merged.inputList.add(item);
}
}
return merged;
}
public String extractOutput(Accum accum) {
return "";
}
}
没有可用于扩展 CombineFn
的 combine perKey 的示例代码。请让我知道上面的代码有什么问题。
如果您只想遍历所有值,您可以使用 GroupByKey
将 PCollection<KV<K, V>>
变成 PCollection<KV<K, Iterable<V>>
。然后你可以编写一个 DoFn
来处理其中的每个元素,并在内部迭代 Iterable<V>
.
请注意,您只会收到与同一 window 中的键关联的所有值。如果您使用默认的全局 window,那将是所有值。
Combine
和 CombineFn
在您想要将所有值组合成较小的输出时最有用。例如,如果您想求所有值的总和或平均值,使用 Sum.perKey()
或 Mean.perKey()
会更有效。效率来自能够传递(和合并)累加器。在 Sum
的情况下,这对应于部分和。
举个例子,管道 运行s 在两台机器上。第一台机器处理 KV<user1, attr1a>, KV<user1, attr1b>, KV<user2, attr2a>
,第二台机器处理 KV<user1, attr1c>, KV<user2, attr2b>
.
CombineAttributes
(无论是哪种实现方式)将首先在每台机器上调用。所以它可以将 [attr1a, attr1b]
组合成一个字符串或累加器(比如 attr1a+attr1b
)。然后它会在另一台机器上 运行 将 [attr1c]
组合成 attr1c
。然后它将合并所有这些部分结果以获得最终累加器 -- attr1a+attr1b+attr1c
。在最初的实施情况下,这将是最终答案。在后者中,将在此累加器上调用 extractOutput
。