在运行时读取多个文件(数据流模板)

Read multiple files at runtime (dataflow template)

我正在尝试构建数据流模板。

目标是读取 ValueProvider,它会告诉我要读取哪些文件。 然后对于每个文件,我需要使用对象读取和丰富数据。 我试过这个:

        p.apply(Create.of(options.getScheduleBatch()))
            .apply(ParDo.of(StringScheduleBatchToFileReceivedFn.of()))
            .apply(ParDo.of(new DoFn<FileReceived, PCollection<EventRow>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    FileReceived fileReceived = c.element();
                    Broker broker = configuration.getBroker(fileReceived.getBrokerId());
                    PCollection<EventRow> eventRows = p
                            .apply(TextIO.read().from(fileReceived.getUri()))
                            .apply(ParDo.of(StringToEventRowFn.of(broker, fileReceived, options.getJobName())));
                    c.output(eventRows);
                }
            }));

但是我有以下错误:

从 CoderRegistry 推断编码器失败:无法为 org.apache.beam.sdk.values.PCollection 提供编码器。

我很想找到比自己使用 gcs 客户端读取文件更好的方法。

你有什么建议吗?

此致

问题:

您正在尝试发出 PCollection 作为 ParDo 的输出。这行不通。

详情:

PCollection 是一种抽象,表示可能无界的元素集合。对 PCollection 应用转换会得到另一个 PCollection。您可以应用的转换之一是 ParDoParDos 进行逐元素变换。应用 ParDo 时,您表示 - "take this PCollection and make another one by converting all elements within it by applying that ParDo".

使处理有效的其中一件事是能够并行执行所有内容,例如在多个执行节点(例如 VMs/machines)上一次转换大量元素,通过 运行 相同的 ParDo 在每个节点上针对不同的元素。而且你无法明确控制任何特定的转换是否会发生在同一个执行节点或另一个执行节点上,这是底层系统设计的一部分,如何优化它。但是要实现这一点,您必须能够在执行节点之间潜在地传递元素并将它们持久化以进行聚合。 Beam 通过要求您为元素实现 Coders 来支持这一点。编码器是一种序列化机制,允许 Beam 将元素(由 java 对象表示)转换为字节数组,然后可以将其传递给下一个转换(可能发生在另一台机器上)或存储。例如,Beam 需要能够对您从 ParDo 输出的元素进行编码。 Beam 知道如何序列化某些类型,但它无法自动推断所有内容,您必须为无法推断的内容提供编码器。

您的示例如下所示:取一些 PCollection,然后通过对每个元素应用 ParDo 将其转换为另一个 PCollection,然后 ParDo 转换每个元素输入元素到 PCollection。这意味着一旦元素被 ParDo 处理,您就必须对其进行编码并将其传递给下一个转换。这里的问题是 - 你如何编码并传递一个(可能是无界的)PCollection 到下一个转换或保留它以进行聚合?

Beam 目前不支持此功能,因此您需要选择其他方法。

在你的具体情况下,我不确定在开箱即用的 Beam 中你是否可以简单地使用文件名流并将它们转换成子管道来处理文件中的行。

解决方法:

我能想到的绕过此限制的方法很少:

  • 如果您的文件名有已知模式,您可以在 TextIO 中指定模式,它可以在新文件到达时读取它们。

  • 如果它们没有已知模式,您可以编写另一个管道来重命名文件名,以便它们具有通用名称模式,然后使用 TextIO 中的模式另一个管道。

  • 如果可行(例如文件适合内存),您可能可以使用纯 java File API 读取文件内容,将它们分成行和在单个 ParDo 中发出这些行。那么就可以在下面的ParDo.

  • 中应用同样的StringToEventRowFn

希望这对您有所帮助