使用元素计数通过 Dataflow 写入 GCS
Writing to GCS with Dataflow using element count
这是参考 Apache Beam SDK 版本 2.2.0。
我正在尝试使用 AfterPane.elementCountAtLeast(...)
,但到目前为止还没有成功。我想要的看起来很像,但是需要适配到2.2.0。最终,我只需要一个简单的 OR,在 X 元素或 Y 时间过去后写入文件。我打算将时间设置得非常高,以便在大多数情况下写入发生在元素数量上,并且仅在消息量非常低的时候根据持续时间进行写入。
使用 GCP Dataflow 2.0 PubSub to GCS 作为参考,这是我尝试过的:
String bucketPath =
String.format("gs://%s/%s",
options.getBucketName(),
options.getDestinationDirName());
PCollection<String> windowedValues = stringMessages
.apply("Create windows",
Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
.discardingFiredPanes());
windowedValues
.apply("Write to GCS",
TextIO
.write()
.to(bucketPath)
.withNumShards(options.getNumShards())
.withWindowedWrites());
其中 stringMessages
是一个从 Avro 编码的 pubsub 订阅中读取的 PCollection。上游发生了一些解包以将事件转换为字符串,但没有 merging/partitioning/grouping,只是转换。
元素数量硬编码为 250,仅用于 PoC。一旦得到证实,它很可能会提高到 10s 或 100s 的数千范围。
问题
此实现产生了各种长度的文本文件。当作业第一次启动时(大概是处理积压的数据,然后在某个时候稳定下来),文件长度开始非常高(1000 个元素)。我尝试将 'numShards' 更改为 1 和 10。在 1 时,写入文件的元素数稳定在600个,10个稳定在300个。
我在这里错过了什么?
As a side note, this is only step 1. Once I figure out writing using
element count, I still need to figure out writing these files as
compressed json (.json.gz) as opposed to plain-text files.
把自己学到的贴出来,供大家参考。
我写这篇文章时不清楚的是 Apache Beam Documentation 中的以下内容:
Transforms that aggregate multiple elements, such as GroupByKey
and
Combine
, work implicitly on a per-window basis
有了这些知识,我重新考虑了我的管道。从Writing files下的FileIO documentation -> How many shards are generated per pane:
Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey
to the pipeline. However, it is required to set it when writing an unbounded PCollection
due to BEAM-1438 and similar behavior in other runners.
所以我决定使用 FileIO
的 writeDynamic
来执行写入并指定 withNumShards
以获得隐式 GroupByKey
。最终结果如下所示:
PCollection<String> windowedValues = validMessageStream.apply(Window
.<String>configure()
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(2000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(windowDurationSeconds)))))
.discardingFiredPanes());
windowedValues.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://data_pipeline_events_test/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
这是参考 Apache Beam SDK 版本 2.2.0。
我正在尝试使用 AfterPane.elementCountAtLeast(...)
,但到目前为止还没有成功。我想要的看起来很像
使用 GCP Dataflow 2.0 PubSub to GCS 作为参考,这是我尝试过的:
String bucketPath =
String.format("gs://%s/%s",
options.getBucketName(),
options.getDestinationDirName());
PCollection<String> windowedValues = stringMessages
.apply("Create windows",
Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
.discardingFiredPanes());
windowedValues
.apply("Write to GCS",
TextIO
.write()
.to(bucketPath)
.withNumShards(options.getNumShards())
.withWindowedWrites());
其中 stringMessages
是一个从 Avro 编码的 pubsub 订阅中读取的 PCollection。上游发生了一些解包以将事件转换为字符串,但没有 merging/partitioning/grouping,只是转换。
元素数量硬编码为 250,仅用于 PoC。一旦得到证实,它很可能会提高到 10s 或 100s 的数千范围。
问题
此实现产生了各种长度的文本文件。当作业第一次启动时(大概是处理积压的数据,然后在某个时候稳定下来),文件长度开始非常高(1000 个元素)。我尝试将 'numShards' 更改为 1 和 10。在 1 时,写入文件的元素数稳定在600个,10个稳定在300个。
我在这里错过了什么?
As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.
把自己学到的贴出来,供大家参考。
我写这篇文章时不清楚的是 Apache Beam Documentation 中的以下内容:
Transforms that aggregate multiple elements, such as
GroupByKey
andCombine
, work implicitly on a per-window basis
有了这些知识,我重新考虑了我的管道。从Writing files下的FileIO documentation -> How many shards are generated per pane:
Note that setting a fixed number of shards can hurt performance: it adds an additional
GroupByKey
to the pipeline. However, it is required to set it when writing an unboundedPCollection
due to BEAM-1438 and similar behavior in other runners.
所以我决定使用 FileIO
的 writeDynamic
来执行写入并指定 withNumShards
以获得隐式 GroupByKey
。最终结果如下所示:
PCollection<String> windowedValues = validMessageStream.apply(Window
.<String>configure()
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(2000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(windowDurationSeconds)))))
.discardingFiredPanes());
windowedValues.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://data_pipeline_events_test/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));