数据流 2.x 抱怨调用 PCollectionTuple.apply() 时参数类型不正确

Dataflow 2.x complaining about incorrect parameter type on call to PCollectionTuple.apply()

我正在将现有管道迁移到数据流 2.x。在管道的最后阶段,数据被写入 Google 云服务。数据需要压缩为 .gz,因此之前(在我们的数据流 1.x 实现中)我们编写了自己的 Sink 来为我们做这件事。在数据流 2.x 中,有一种内置的方法可以做到这一点。我得到了正确的代码,但是 java 编译器抱怨 TextIO.write() 返回了错误的类型。代码如下:

PCollectionTuple results = /* some transforms */

// write main result
results.get(mainOutputTag).
apply("WriteProfile", TextIO.write().to(outputBucket)
.withSuffix(".json")        
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
.withNumShards(numChunks));

Java 的编译器报错,出现以下错误:

The method apply(String, PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (String, TextIO.Write)

有人看到我上面的代码可能有什么问题吗?如果您需要更多上下文,请告诉我。

我最终解决了这个问题。问题是我试图将 PCollection<TableRow> 写入文件,其中只能将 PCollection<String> 写入文件。

这是我的最终解决方案:

PCollectionTuple results = /* some transforms */

// write main result
results.get(mainOutputTag) /* PCollection<TableRow> */

    .apply(ParDo.of(new DoFn<TableRow, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(c.element().toString());
        }
    })) /* PCollection<String> */

    .apply("WriteProfile", TextIO.write().to(outputBucket)
    .withSuffix(".json")        
    .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
    .withNumShards(numChunks));