GCP Pubsub 中的消息丢失和重复

Message lost and duplicates in GCP Pubsub

我 运行 遇到从 Dataflow 读取 GCP PubSub 的问题,当在短时间内发布大量消息时,Dataflow 将收到大部分已发送的消息,除了一些消息会丢失,和一些其他消息将被复制。最奇怪的是,丢失消息的数量将与被复制的消息数量完全相同。

在其中一个例子中,我在 5 秒内发送了 4,000 条消息,总共收到了 4,000 条消息,但丢失了 9 条消息,正好有 9 条消息重复。

我确定重复项的方法是通过日志记录。我正在记录发布到 Pubsub 的每条消息以及 pubsub 生成的消息 ID。我还在 Pardo 转换中从 PubsubIO 读取后立即记录消息。

我在 Dataflow 中从 Pubsub 读取的方式是使用 org.apache.beam.sdk.ioPubsubIO:

public interface Options extends GcpOptions, DataflowPipelineOptions {

    // PUBSUB URL
    @Description("Pubsub URL")
    @Default.String("https://pubsub.googleapis.com")
    String getPubsubRootUrl();
    void setPubsubRootUrl(String value);

    // TOPIC
    @Description("Topic")
    @Default.String("projects/test-project/topics/test_topic")
    String getTopic();
    void setTopic(String value);

...
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);


    options.setStreaming(true);
    options.setRunner(DataflowRunner.class);

    ...

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply(PubsubIO
                 .<String>read()
                 .topic(options.getTopic())
                 .withCoder(StringUtf8Coder.of())
            )

            .apply("Logging data coming out of Pubsub", ParDo
                .of(some_logging_transformation)
            )

            .apply("Saving data into db", ParDo
                .of(some_output_transformation)
            )
            ;


    pipeline.run().waitUntilFinish();


}

我想知道这是否是 Pubsub 或 PubsubIO 中的已知问题?

更新: 使用 pubsub 模拟器尝试了 4000 个请求,没有丢失数据也没有重复

更新#2:

我又进行了一些实验,发现复制消息正在从丢失的消息中获取 message_id。因为问题的方向已经从它的起源转移了很多,我决定 post 另一个问题,包含详细的日志以及我用来发布和接收消息的代码。 link 到新问题:

我和 PubSub 团队的一个 Google 人谈过。这似乎是由 Python 客户端的线程安全问题引起的。 Google

的回复请参考 的已接受答案