Google Cloud Dataflow:无法使用 TextIO.Read 解析原型

Google Cloud Dataflow: Can't parse proto using TextIO.Read

这是我的代码

PCollection<MyProto> pCollection = p.apply(TextIO.Read.from(
            "gs://my_bucket/*")
            .withCoder(Proto2Coder.of(MyProto.class)));

但这失败并出现错误

Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

本地下载的文件可以正常解析。

我也尝试过使用 StringUtf8Coder 和 ByteArrayCoder 来做同样的事情,但没有骰子。

有什么帮助吗?我不应该使用 TextIO 吗?我还有哪些其他选择?

TextIO 将文件分成几行并将编码器应用于每一行。自然,这不适用于非基于行的格式。我想您的文件每个都包含一个序列化原型,对吗?在这种情况下,您有 2 个选择:

  • 创建您自己的来源和 Reader 类(参见 generic documentation on creating sources and sinks) by subclassing FileBasedFormat
  • 将处理所有文件的行为视为一个 ParDo - 创建一个内存中的 PCollection,其中包含要处理的文件名(使用 Create.of()),并通过一个 ParDo 来获取文件名并解析文件作为原型缓冲区;然后通过管道传输到管道的其余部分。

第二种更简单,但如果您有很多文件,第一种会更好。