Google Cloud Dataflow (Apache Beam) - 如何使用 header 处理 gzip 压缩的 csv 文件?

Google Cloud Dataflow (Apache Beam) - how to process gzipped csv files with a header?

我在 GCS 中有 csv(gzip 压缩)文件。我想读取这些文件并将数据发送到 BigQuery。

header 信息可以更改(虽然我提前知道所有列),所以只删除 header 是不够的,不知何故我需要阅读第一行并附加列剩余行的信息。

怎么可能?

我首先认为我必须实现这样的自定义源 post。
Reading CSV header with Dataflow
但是使用这个解决方案,我不确定如何先解压缩 Gzip。我可以像 TextIO 一样使用 withCompressionType 吗? (我找到了一个参数 compression_type in a python Class 但我正在使用 Java 并且在 Java FileBasedSource class 中找不到类似的参数。)

另外我觉得这有点过分了,因为它使文件不可分割(尽管在我的情况下没问题)。

或者我可以使用 GoogleCloudStorage 并直接在我的 main() 函数中首先读取文件及其第一行,然后进入管道。

但这也很麻烦,所以我想确认是否有任何最佳实践(Dataflow 方式)在 Dataflow 中利用 header 读取 csv 文件?

如果我正确理解您要完成的任务,SideInput (doc, example) 可能就是答案。它将允许 header 可用于处理文件的每一行。

总体思路是将 header 作为单独的 PCollectionView 发出,并将其用作 per-line 处理的 SideInput。您可以使用 SideOutput (doc)

对文件进行一次传递来实现此目的

如果我没看错你的问题,听起来你的 header 内容因文件而异。如果是这样,您可以使用 View.asMap 从每个文件中保留 headers 的映射。不幸的是,目前本机不支持跟踪正在读取的当前文件名,但是 this post 中讨论了 work-arrounds。