如何使用 Google 数据流输出 <key,value> 对中的所有值,按键分组
How to output all values from a <key,value> pair, grouped by key, using Google Dataflow
我正在尝试做一些看起来相对简单但 运行 遇到一些困难的事情。
我有一堆文字,每一行都是一个值。我分析每一行文本,创建适当的密钥,然后发出 KV 对。然后我使用 GroupByKey
转换。最后,我想输出现在按键分组的所有文本(如果我能为每个键获得一个文本文件则加分,但我不确定这是否可能)。
这是管道的 apply
的样子:
public PCollection<String> apply(PCollection<String> generator) {
// Returns individuals lines of text as <String,String> KV pairs
PCollection<KV<String,String>> generatedTextKV = generator.apply(
ParDo.of(new GeneratorByLineFn()));
// Groups the <String,String> KV pairs by value
PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
GroupByKey.<String, String>create());
// Hopefully returns output where all of each key's values are together
PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));
return results;
}
不幸的是,我无法让 FormatOutputFn()
正常工作。
迭代Iterable<String>
并输出每个值并不能保证键值分组(如果我对此有误,请纠正我,然后我的问题就解决了)。然后我尝试使用 StringBuilder()
,它适用于小型数据集,但不出所料地在较大数据的日志中生成 java.lang.OutOfMemoryError: Java heap space
错误。我还尝试了 Flatten.FlattenIterables
转换,但这也不起作用,因为 K、V 对中的值不是 PCollection
,而只是常规的 Iterable
.
我已经看到 this question on analysis by common key,但是根据回答,我并不完全清楚我应该如何处理我的情况。我想我必须使用 Combine.PerKey
,但我不确定如何使用它。我还假设必须有一种预烘焙的方式来执行此操作,但我在文档中找不到这种预烘焙的方式。我确定我没有找对地方。
而且,如上所述,如果有一种方法可以获取文本文件输出,其中文本文件的名称是键,值都在文件中,那将是惊人的。但我不认为 Dataflow 可以做到这一点(还?)。
感谢您的阅读。
Dataflow 目前不支持对 PCollections 进行排序的任何概念。你是对的,不能保证 'results' 有一个顺序,包括键分组。我们希望在某个时候为 PCollections 添加排序属性,但具体时间尚不清楚。
由于底层实现细节的原因,某些运行器在某些情况下可能看起来有顺序。例如,如果 FormatOutputFn 与写入步骤融合,那么您可能会看到分组,因为每个 KV<K, Iterable<V>>
被处理成多个 <K,V>
,这些 <K,V>
在下一个 KV<K, Iterable<V>>
之前写入文件被处理。但同样,这只是 Dataflow 选择优化此特定案例的方式的产物,不应普遍依赖。
如您所知,如果单个元素可以放入内存,您可以让 FormatOutputFn 将每个 KV<K, Iterable<V>>
转换为包含多个换行符的单个字符串。
鉴于此处情况并非如此,我能想到的最佳解决方案是手动编写文件——因此 FormatOutputFn 获取每个 KV<K, Iterable<V>>
并使用标准 GCS 库打开一个名为 K 和将 Iterable<V>
写入其中。坏消息是这有点棘手,因为您需要了解我们的容错语义如何重试元素。但好消息是,我们目前正在开发库来帮助简化这些类型的自定义接收器。
至于零长度文件,这里有一个很棒的答案:
我正在尝试做一些看起来相对简单但 运行 遇到一些困难的事情。
我有一堆文字,每一行都是一个值。我分析每一行文本,创建适当的密钥,然后发出 KV 对。然后我使用 GroupByKey
转换。最后,我想输出现在按键分组的所有文本(如果我能为每个键获得一个文本文件则加分,但我不确定这是否可能)。
这是管道的 apply
的样子:
public PCollection<String> apply(PCollection<String> generator) {
// Returns individuals lines of text as <String,String> KV pairs
PCollection<KV<String,String>> generatedTextKV = generator.apply(
ParDo.of(new GeneratorByLineFn()));
// Groups the <String,String> KV pairs by value
PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
GroupByKey.<String, String>create());
// Hopefully returns output where all of each key's values are together
PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));
return results;
}
不幸的是,我无法让 FormatOutputFn()
正常工作。
迭代Iterable<String>
并输出每个值并不能保证键值分组(如果我对此有误,请纠正我,然后我的问题就解决了)。然后我尝试使用 StringBuilder()
,它适用于小型数据集,但不出所料地在较大数据的日志中生成 java.lang.OutOfMemoryError: Java heap space
错误。我还尝试了 Flatten.FlattenIterables
转换,但这也不起作用,因为 K、V 对中的值不是 PCollection
,而只是常规的 Iterable
.
我已经看到 this question on analysis by common key,但是根据回答,我并不完全清楚我应该如何处理我的情况。我想我必须使用 Combine.PerKey
,但我不确定如何使用它。我还假设必须有一种预烘焙的方式来执行此操作,但我在文档中找不到这种预烘焙的方式。我确定我没有找对地方。
而且,如上所述,如果有一种方法可以获取文本文件输出,其中文本文件的名称是键,值都在文件中,那将是惊人的。但我不认为 Dataflow 可以做到这一点(还?)。
感谢您的阅读。
Dataflow 目前不支持对 PCollections 进行排序的任何概念。你是对的,不能保证 'results' 有一个顺序,包括键分组。我们希望在某个时候为 PCollections 添加排序属性,但具体时间尚不清楚。
由于底层实现细节的原因,某些运行器在某些情况下可能看起来有顺序。例如,如果 FormatOutputFn 与写入步骤融合,那么您可能会看到分组,因为每个 KV<K, Iterable<V>>
被处理成多个 <K,V>
,这些 <K,V>
在下一个 KV<K, Iterable<V>>
之前写入文件被处理。但同样,这只是 Dataflow 选择优化此特定案例的方式的产物,不应普遍依赖。
如您所知,如果单个元素可以放入内存,您可以让 FormatOutputFn 将每个 KV<K, Iterable<V>>
转换为包含多个换行符的单个字符串。
鉴于此处情况并非如此,我能想到的最佳解决方案是手动编写文件——因此 FormatOutputFn 获取每个 KV<K, Iterable<V>>
并使用标准 GCS 库打开一个名为 K 和将 Iterable<V>
写入其中。坏消息是这有点棘手,因为您需要了解我们的容错语义如何重试元素。但好消息是,我们目前正在开发库来帮助简化这些类型的自定义接收器。
至于零长度文件,这里有一个很棒的答案: