如何使用 Google 数据流输出 <key,value> 对中的所有值,按键分组

How to output all values from a <key,value> pair, grouped by key, using Google Dataflow

我正在尝试做一些看起来相对简单但 运行 遇到一些困难的事情。

我有一堆文字,每一行都是一个值。我分析每一行文本,创建适当的密钥,然后发出 KV 对。然后我使用 GroupByKey 转换。最后,我想输出现在按键分组的所有文本(如果我能为每个键获得一个文本文件则加分,但我不确定这是否可能)。

这是管道的 apply 的样子:

    public PCollection<String> apply(PCollection<String> generator) {

        // Returns individuals lines of text as <String,String> KV pairs
        PCollection<KV<String,String>> generatedTextKV = generator.apply(
                ParDo.of(new GeneratorByLineFn()));

        // Groups the <String,String> KV pairs by value
        PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
            GroupByKey.<String, String>create());

        // Hopefully returns output where all of each key's values are together
        PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));

        return results;
    }

不幸的是,我无法让 FormatOutputFn() 正常工作。

迭代Iterable<String>并输出每个值并不能保证键值分组(如果我对此有误,请纠正我,然后我的问题就解决了)。然后我尝试使用 StringBuilder(),它适用于小型数据集,但不出所料地在较大数据的日志中生成 java.lang.OutOfMemoryError: Java heap space 错误。我还尝试了 Flatten.FlattenIterables 转换,但这也不起作用,因为 K、V 对中的值不是 PCollection,而只是常规的 Iterable.

我已经看到 this question on analysis by common key,但是根据回答,我并不完全清楚我应该如何处理我的情况。我想我必须使用 Combine.PerKey,但我不确定如何使用它。我还假设必须有一种预烘焙的方式来执行此操作,但我在文档中找不到这种预烘焙的方式。我确定我没有找对地方。

而且,如上所述,如果有一种方法可以获取文本文件输出,其中文本文件的名称是键,值都在文件中,那将是惊人的。但我不认为 Dataflow 可以做到这一点(还?)。

感谢您的阅读。

Dataflow 目前不支持对 PCollections 进行排序的任何概念。你是对的,不能保证 'results' 有一个顺序,包括键分组。我们希望在某个时候为 PCollections 添加排序属性,但具体时间尚不清楚。

由于底层实现细节的原因,某些运行器在某些情况下可能看起来有顺序。例如,如果 FormatOutputFn 与写入步骤融合,那么您可能会看到分组,因为每个 KV<K, Iterable<V>> 被处理成多个 <K,V>,这些 <K,V> 在下一个 KV<K, Iterable<V>> 之前写入文件被处理。但同样,这只是 Dataflow 选择优化此特定案例的方式的产物,不应普遍依赖。

如您所知,如果单个元素可以放入内存,您可以让 FormatOutputFn 将每个 KV<K, Iterable<V>> 转换为包含多个换行符的单个字符串。

鉴于此处情况并非如此,我能想到的最佳解决方案是手动编写文件——因此 FormatOutputFn 获取每个 KV<K, Iterable<V>> 并使用标准 GCS 库打开一个名为 K 和将 Iterable<V> 写入其中。坏消息是这有点棘手,因为您需要了解我们的容错语义如何重试元素。但好消息是,我们目前正在开发库来帮助简化这些类型的自定义接收器。

至于零长度文件,这里有一个很棒的答案: