使用元素计数通过 Dataflow 写入 GCS

Writing to GCS with Dataflow using element count

这是参考 Apache Beam SDK 版本 2.2.0。

我正在尝试使用 AfterPane.elementCountAtLeast(...),但到目前为止还没有成功。我想要的看起来很像,但是需要适配到2.2.0。最终,我只需要一个简单的 OR,在 X 元素或 Y 时间过去后写入文件。我打算将时间设置得非常高,以便在大多数情况下写入发生在元素数量上,并且仅在消息量非常低的时候根据持续时间进行写入。

使用 GCP Dataflow 2.0 PubSub to GCS 作为参考,这是我尝试过的:

String bucketPath =
    String.format("gs://%s/%s", 
        options.getBucketName(), 
        options.getDestinationDirName());

PCollection<String> windowedValues = stringMessages
    .apply("Create windows",
        Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
        .discardingFiredPanes());

windowedValues
    .apply("Write to GCS",
        TextIO
            .write()
            .to(bucketPath)
            .withNumShards(options.getNumShards())
            .withWindowedWrites());

其中 stringMessages 是一个从 Avro 编码的 pubsub 订阅中读取的 PCollection。上游发生了一些解包以将事件转换为字符串,但没有 merging/partitioning/grouping,只是转换。

元素数量硬编码为 250,仅用于 PoC。一旦得到证实,它很可能会提高到 10s 或 100s 的数千范围。

问题

此实现产生了各种长度的文本文件。当作业第一次启动时(大概是处理积压的数据,然后在某个时候稳定下来),文件长度开始非常高(1000 个元素)。我尝试将 'numShards' 更改为 1 和 10。在 1 时,写入文件的元素数稳定在600个,10个稳定在300个。

我在这里错过了什么?

As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.

把自己学到的贴出来,供大家参考。

我写这篇文章时不清楚的是 Apache Beam Documentation 中的以下内容:

Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis

有了这些知识,我重新考虑了我的管道。从Writing files下的FileIO documentation -> How many shards are generated per pane:

Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey to the pipeline. However, it is required to set it when writing an unbounded PCollection due to BEAM-1438 and similar behavior in other runners.

所以我决定使用 FileIOwriteDynamic 来执行写入并指定 withNumShards 以获得隐式 GroupByKey。最终结果如下所示:

PCollection<String> windowedValues = validMessageStream.apply(Window
            .<String>configure()
            .triggering(Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(2000),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                            Duration.standardSeconds(windowDurationSeconds)))))
            .discardingFiredPanes());

windowedValues.apply(FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://data_pipeline_events_test/events/")
            .withDestinationCoder(StringUtf8Coder.of())
            .withNumShards(1)
            .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));