Spring Cloud Stream Kafka - 最终一致性 - Kafka 是否自动重试未确认的消息(使用 autocommitoffset=false 时)
Spring Cloud Stream Kafka - Eventual consistency - Does Kafka auto retry unacknowledged messages (when using autocommitoffset=false)
事实证明,实施最终一致的分布式架构是一件痛苦的事情。有大量的博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。
我遇到的一个方面是必须处理消息未被确认时的手动重试。
例如:我的订单服务向 Kafka 发送一个支付事件。支付服务订阅并处理它,回答支付成功或支付失败
要求付款:Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service
付款成功:-> Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service
支付失败 -> Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service
重点是:
我确定消息何时通过同步发送传送到 Kafka。但是,我必须知道付款已由付款服务处理的唯一方法是期待一个应答事件(付款正常 | 付款失败)。
这迫使我在订单服务器中实现重试机制。如果一段时间后仍未得到答复,请使用新的支付事件重试。
此外,这也迫使我处理支付服务中的重复消息,以防它们实际被处理但订单服务没有得到答复。
我想知道 Kafka 是否有一个内置机制来在消费者不确认消息的新偏移量时发送重试。
在Spring Cloud Stream中我们可以将autoCommitOffset
属性设置为false并处理消费者中偏移量的ack:
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
如果我们不执行会怎样 acknowledgment.acknowledge();
消息会不会被Kafka自动重发给这个消费者?
如果可能的话,我们就不需要再手动重试了,可以这样做:
付款人服务:
@Autowired
private PaymentBusiness paymentBusiness;
@StreamListener(Sink.INPUT)
public void process(Order order) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
paymentBusiness(order);
//If we don't get here because of an exception
//Kafka would retry...
acknowledgment.acknowledge();
}
}
如果可以的话,Kafka 中的重试周期是如何配置的?
在最坏的情况下(也是最有可能的),这不受支持,我们将不得不手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的真实示例吗?
What happens if we don't execute acknowledgment.acknowledge(); Will the message be automatically resent by Kafka to this consumer?
没有。只要客户端打开,Kafka 消费者就会按顺序读取消息。 Kafka 不支持更复杂的确认模式,例如单独的消息确认,只更新给定消费者组的偏移量和partition-topic。 Spring Cloud Stream 支持手动确认 Spring Cloud Stream 中的消息,用于异步处理消息的场景(从而防止消息丢失) - 但假设一旦消息被手动确认,其偏移量将被保存,因此来自同一 partition-topic 的所有先前消息都将被视为 'read'。如果您想挑出失败的消息,您可以使用 DLQ 支持 - 并让后续的消费者接收它们。重新启动消费者将从上次保存的偏移量恢复读取,因此您可以选择不为一系列未成功处理的消息保存偏移量。
Spring Cloud Stream 消费者有 built-in 重试和 DLQ 支持 - 请参阅 http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties as well as retry settings provided as part of the default consumer properties: http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties
中的 enableDlq
事实证明,实施最终一致的分布式架构是一件痛苦的事情。有大量的博客文章讲述了如何做到这一点,但没有展示(代码)如何实际做到这一点。
我遇到的一个方面是必须处理消息未被确认时的手动重试。
例如:我的订单服务向 Kafka 发送一个支付事件。支付服务订阅并处理它,回答支付成功或支付失败
要求付款:
Order Service ----Pay event----> Kafka ----Pay Event ----> Payment Service
付款成功:->
Payment Service ----Payment ok event ----> Kafka ----Payment ok Event ----> Order Service
支付失败 ->
Payment Service ----Payment failure event ----> Kafka ----Payment failure Event ----> Order Service
重点是:
我确定消息何时通过同步发送传送到 Kafka。但是,我必须知道付款已由付款服务处理的唯一方法是期待一个应答事件(付款正常 | 付款失败)。
这迫使我在订单服务器中实现重试机制。如果一段时间后仍未得到答复,请使用新的支付事件重试。
此外,这也迫使我处理支付服务中的重复消息,以防它们实际被处理但订单服务没有得到答复。
我想知道 Kafka 是否有一个内置机制来在消费者不确认消息的新偏移量时发送重试。
在Spring Cloud Stream中我们可以将autoCommitOffset
属性设置为false并处理消费者中偏移量的ack:
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
如果我们不执行会怎样 acknowledgment.acknowledge();
消息会不会被Kafka自动重发给这个消费者?
如果可能的话,我们就不需要再手动重试了,可以这样做:
付款人服务:
@Autowired
private PaymentBusiness paymentBusiness;
@StreamListener(Sink.INPUT)
public void process(Order order) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
paymentBusiness(order);
//If we don't get here because of an exception
//Kafka would retry...
acknowledgment.acknowledge();
}
}
如果可以的话,Kafka 中的重试周期是如何配置的?
在最坏的情况下(也是最有可能的),这不受支持,我们将不得不手动重试。您知道 Spring Cloud Stream 应用程序使用 Kafka 处理最终一致性的真实示例吗?
What happens if we don't execute acknowledgment.acknowledge(); Will the message be automatically resent by Kafka to this consumer?
没有。只要客户端打开,Kafka 消费者就会按顺序读取消息。 Kafka 不支持更复杂的确认模式,例如单独的消息确认,只更新给定消费者组的偏移量和partition-topic。 Spring Cloud Stream 支持手动确认 Spring Cloud Stream 中的消息,用于异步处理消息的场景(从而防止消息丢失) - 但假设一旦消息被手动确认,其偏移量将被保存,因此来自同一 partition-topic 的所有先前消息都将被视为 'read'。如果您想挑出失败的消息,您可以使用 DLQ 支持 - 并让后续的消费者接收它们。重新启动消费者将从上次保存的偏移量恢复读取,因此您可以选择不为一系列未成功处理的消息保存偏移量。
Spring Cloud Stream 消费者有 built-in 重试和 DLQ 支持 - 请参阅 http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_kafka_consumer_properties as well as retry settings provided as part of the default consumer properties: http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR2/reference/htmlsingle/#_consumer_properties
中的enableDlq