排空从 PubSub 读取并写入 Google Cloud Storage 的 Dataflow 作业时数据丢失

Data loss when draining Dataflow job that reads from PubSub and writes to Google Cloud Storage

将固定数量的字符串(800,000 个 1KB 用于测试)放入 PubSub 主题并 运行在 Dataflow 中执行以下 Apache Beam (2.1.0) 作业时,正好按预期保留语义.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGsSimpleJob {

    public static void main(String[] args) {
        PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(PubSubToGsPipelineOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
                .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
                .apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
        p.run();
    }

}

PipelineOptions 下面的实现

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

public interface PubSubToGsPipelineOptions extends PipelineOptions {
    @Description("PubSub subscription")
    String getInput();
    void setInput(String input);

    @Description("Google Cloud Storage output path")
    String getOutput();
    void setOutput(String output);
}

但是,如果相同的作业是 运行,在读取所有元素之前排空(如 Dataflow 控制台中所示),然后再次启动,则输出文件的记录数少于原始数据集已发布到 PubSub 主题。这表明耗尽和替换作业会导致数据丢失,这看起来很奇怪,因为 this google cloud blog post 提到 Drain and replace 应该至少有一次语义。应该如何设计这个流水线以在排空和替换作业时实现至少一次语义(或更好但恰好一次语义)?

我的猜测是 window 可能在耗尽之前被部分写入,替换作业会用剩余的 window 覆盖它。您可以检查此日志行 in WriteFiles 的耗尽作业和替换作业的工作日志。如果您使用 Beam HEAD,它还会记录最终目的地被覆盖的情况。

从概念上讲,耗尽工作和替代工作是完全不同的管道。使用相同的输出位置与对其他两个不相关的作业使用相同的输出位置没有什么不同。您可以尝试的另一件事是对第二个作业使用不同的输出路径,并验证两个目录中是否存在所有记录。