如何一次处理一个 GCS 文件模式,一个完整的文件?

How to process a GCS filepattern, full file at a time?

我需要处理一个 (GCS) 文件桶,其中每个文件都经过压缩并包含一条多行 JSON 记录。此外,正在处理的文件的名称很重要,我需要在我的转换中知道它。

从文档中的示例开始,TextIO 看起来非常接近,但它看起来设计用于逐行处理每个文件,并且不允许我一次读取整个文件。另外,我没有看到任何方法来获取正在处理的文件名?

PCollectionTuple results = p.apply(TextIO.Read
    .from("gs://bucket/a/*.gz")
    .withCompressionType(TextIO.CompressionType.GZIP)
    .withCoder(MyJsonCoder.of()))

看来我需要编写自定义 IO reader 或类似的东西?关于最佳起点的任何提示?

你是对的,现有 类 中的 none 正在做你想做的事情。有 2 种合理的方法:

  • 自己匹配文件模式(使用 IOChannelUtils and IOChannelFactory)并将生成的文件包装到 PCollection<String> 中,其中字符串将是文件名,使用 Create.of(filenames)。然后应用一个 ParDo 和一个读取给定文件名的函数。
  • 编写您自己的 Source 子类(还有 FileBasedSource,但它不太适合您的用例)。它将由 filepattern 配置,splitIntoBundles 将匹配 filepattern 并扩展为单独的源,每个源对应一个文件。

我会推荐第一种方法,因为它看起来代码更少,而且您的用例不需要 Source 的全部功能。