Apache Kafka Streams:乱序消息
Apache Kafka Streams : Out-of-Order messages
我有一个写入主题 A (TA) 的 Apache Kafka 2.6 Producer。
我还有一个 Kafka 流应用程序,它从 TA 消费并写入主题 B (TB)。
在流应用程序中,我有一个自定义时间戳提取器,它从消息有效负载中提取时间戳。
对于我的一个故障处理测试用例,我在我的应用程序 运行.
时关闭了 Kafka 集群
当生产者应用程序尝试将消息写入 TA 时,它不能,因为集群已关闭,因此(我假设)缓冲了消息。
假设它按递增的时间顺序接收到 4 条消息 m1、m2、m3、m4。 (即 m1 在前,m4 在后)。
当我将 Kafka 集群重新联机时,生产者将缓冲的消息发送到主题,但它们没有按顺序发送。例如,我收到 m2,然后是 m3,然后是 m1,然后是 m4。
这是为什么?是不是因为producer中的buffer是多线程的,每个都同时produce到topic?
我假设自定义时间戳提取器有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。
我从 SO 那里得到了一个解决方案,将所有事件从 tA 流式传输到另一个中间主题(比如 tA'),这将使用时间戳提取器到另一个主题。但我不确定这是否会导致事件根据提取的时间戳重新排序。
我的 Producer 代码如下所示(我使用 Spring Cloud 来创建 Producer):
Producer.java
@Service
public class Producer {
private String topicName = "input-topic";
private ApplicationProperties appProps;
@Autowired
private KafkaTemplate<String, MyEvent> kafkaTemplate;
public Producer() {
super();
}
@Autowired
public void setAppProps(ApplicationProperties appProps) {
this.appProps = appProps;
this.topicName = appProps.getInput().getTopicName();
}
public void sendMessage(String key, MyEvent ce) {
ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce);
}
}
Why is that ? Is it because the buffering in the producer is multi-threaded with each producing to the topic at the same time ?
默认情况下,生产者允许向代理发送最多 5 个并行的运行中请求,因此如果某些请求失败并重试,请求顺序可能会改变。
为避免此重新排序问题,您可以设置 max.in.flight.requests.per.connection = 1
(可能会影响性能)或设置 enable.idempotence = true
.
顺便说一句:你没有说你的话题是单分区还是多分区,你的消息有没有key?如果您的主题有多个分区,并且您的消息被发送到不同的分区,则无论如何都无法保证读取的顺序,因为偏移量排序仅在一个分区内得到保证。
I assumed that the custom timestamp extractor would help in ordering messages when consuming them. But they do not. Or maybe my understanding of the timestamp extractor is wrong.
时间戳提取器仅提取时间戳。 Kafka Streams 不会重新排序任何消息,但始终以偏移顺序处理消息。
If not, then what are the specific uses of the timestamp extractor ? Just to associate a timestamp with an event ?
正确。
I got one solution from SO here, to just stream all events from tA to another intermediate topic (say tA') which will use the TimeStamp extractor to another topic. But I am not sure if this will cause the events to get reordered based on the extracted timestamp.
不,它不会进行任何重新排序。另一个 SO 问题只是要更改时间戳,但是如果您按 a、b、c 的顺序读取消息,结果将按 a、b、c 的顺序写入(只是时间戳不同,但应保留偏移顺序)。
这个演讲解释了更多细节:https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
我有一个写入主题 A (TA) 的 Apache Kafka 2.6 Producer。 我还有一个 Kafka 流应用程序,它从 TA 消费并写入主题 B (TB)。 在流应用程序中,我有一个自定义时间戳提取器,它从消息有效负载中提取时间戳。
对于我的一个故障处理测试用例,我在我的应用程序 运行.
时关闭了 Kafka 集群当生产者应用程序尝试将消息写入 TA 时,它不能,因为集群已关闭,因此(我假设)缓冲了消息。 假设它按递增的时间顺序接收到 4 条消息 m1、m2、m3、m4。 (即 m1 在前,m4 在后)。
当我将 Kafka 集群重新联机时,生产者将缓冲的消息发送到主题,但它们没有按顺序发送。例如,我收到 m2,然后是 m3,然后是 m1,然后是 m4。
这是为什么?是不是因为producer中的buffer是多线程的,每个都同时produce到topic?
我假设自定义时间戳提取器有助于在使用消息时对消息进行排序。但他们没有。或者我对时间戳提取器的理解是错误的。
我从 SO
我的 Producer 代码如下所示(我使用 Spring Cloud 来创建 Producer): Producer.java
@Service
public class Producer {
private String topicName = "input-topic";
private ApplicationProperties appProps;
@Autowired
private KafkaTemplate<String, MyEvent> kafkaTemplate;
public Producer() {
super();
}
@Autowired
public void setAppProps(ApplicationProperties appProps) {
this.appProps = appProps;
this.topicName = appProps.getInput().getTopicName();
}
public void sendMessage(String key, MyEvent ce) {
ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce);
}
}
Why is that ? Is it because the buffering in the producer is multi-threaded with each producing to the topic at the same time ?
默认情况下,生产者允许向代理发送最多 5 个并行的运行中请求,因此如果某些请求失败并重试,请求顺序可能会改变。
为避免此重新排序问题,您可以设置 max.in.flight.requests.per.connection = 1
(可能会影响性能)或设置 enable.idempotence = true
.
顺便说一句:你没有说你的话题是单分区还是多分区,你的消息有没有key?如果您的主题有多个分区,并且您的消息被发送到不同的分区,则无论如何都无法保证读取的顺序,因为偏移量排序仅在一个分区内得到保证。
I assumed that the custom timestamp extractor would help in ordering messages when consuming them. But they do not. Or maybe my understanding of the timestamp extractor is wrong.
时间戳提取器仅提取时间戳。 Kafka Streams 不会重新排序任何消息,但始终以偏移顺序处理消息。
If not, then what are the specific uses of the timestamp extractor ? Just to associate a timestamp with an event ?
正确。
I got one solution from SO here, to just stream all events from tA to another intermediate topic (say tA') which will use the TimeStamp extractor to another topic. But I am not sure if this will cause the events to get reordered based on the extracted timestamp.
不,它不会进行任何重新排序。另一个 SO 问题只是要更改时间戳,但是如果您按 a、b、c 的顺序读取消息,结果将按 a、b、c 的顺序写入(只是时间戳不同,但应保留偏移顺序)。
这个演讲解释了更多细节:https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/