GCP Pubsub 中的消息丢失和重复
Message lost and duplicates in GCP Pubsub
我 运行 遇到从 Dataflow 读取 GCP PubSub 的问题,当在短时间内发布大量消息时,Dataflow 将收到大部分已发送的消息,除了一些消息会丢失,和一些其他消息将被复制。最奇怪的是,丢失消息的数量将与被复制的消息数量完全相同。
在其中一个例子中,我在 5 秒内发送了 4,000 条消息,总共收到了 4,000 条消息,但丢失了 9 条消息,正好有 9 条消息重复。
我确定重复项的方法是通过日志记录。我正在记录发布到 Pubsub 的每条消息以及 pubsub 生成的消息 ID。我还在 Pardo 转换中从 PubsubIO 读取后立即记录消息。
我在 Dataflow 中从 Pubsub 读取的方式是使用 org.apache.beam.sdk.ioPubsubIO
:
public interface Options extends GcpOptions, DataflowPipelineOptions {
// PUBSUB URL
@Description("Pubsub URL")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
// TOPIC
@Description("Topic")
@Default.String("projects/test-project/topics/test_topic")
String getTopic();
void setTopic(String value);
...
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
.apply("Logging data coming out of Pubsub", ParDo
.of(some_logging_transformation)
)
.apply("Saving data into db", ParDo
.of(some_output_transformation)
)
;
pipeline.run().waitUntilFinish();
}
我想知道这是否是 Pubsub 或 PubsubIO 中的已知问题?
更新:
使用 pubsub 模拟器尝试了 4000 个请求,没有丢失数据也没有重复
更新#2:
我又进行了一些实验,发现复制消息正在从丢失的消息中获取 message_id
。因为问题的方向已经从它的起源转移了很多,我决定 post 另一个问题,包含详细的日志以及我用来发布和接收消息的代码。
link 到新问题:
我和 PubSub 团队的一个 Google 人谈过。这似乎是由 Python 客户端的线程安全问题引起的。 Google
的回复请参考 的已接受答案
我 运行 遇到从 Dataflow 读取 GCP PubSub 的问题,当在短时间内发布大量消息时,Dataflow 将收到大部分已发送的消息,除了一些消息会丢失,和一些其他消息将被复制。最奇怪的是,丢失消息的数量将与被复制的消息数量完全相同。
在其中一个例子中,我在 5 秒内发送了 4,000 条消息,总共收到了 4,000 条消息,但丢失了 9 条消息,正好有 9 条消息重复。
我确定重复项的方法是通过日志记录。我正在记录发布到 Pubsub 的每条消息以及 pubsub 生成的消息 ID。我还在 Pardo 转换中从 PubsubIO 读取后立即记录消息。
我在 Dataflow 中从 Pubsub 读取的方式是使用 org.apache.beam.sdk.ioPubsubIO
:
public interface Options extends GcpOptions, DataflowPipelineOptions {
// PUBSUB URL
@Description("Pubsub URL")
@Default.String("https://pubsub.googleapis.com")
String getPubsubRootUrl();
void setPubsubRootUrl(String value);
// TOPIC
@Description("Topic")
@Default.String("projects/test-project/topics/test_topic")
String getTopic();
void setTopic(String value);
...
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
...
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
.apply("Logging data coming out of Pubsub", ParDo
.of(some_logging_transformation)
)
.apply("Saving data into db", ParDo
.of(some_output_transformation)
)
;
pipeline.run().waitUntilFinish();
}
我想知道这是否是 Pubsub 或 PubsubIO 中的已知问题?
更新: 使用 pubsub 模拟器尝试了 4000 个请求,没有丢失数据也没有重复
更新#2:
我又进行了一些实验,发现复制消息正在从丢失的消息中获取 message_id
。因为问题的方向已经从它的起源转移了很多,我决定 post 另一个问题,包含详细的日志以及我用来发布和接收消息的代码。
link 到新问题:
我和 PubSub 团队的一个 Google 人谈过。这似乎是由 Python 客户端的线程安全问题引起的。 Google
的回复请参考