Kafka / RabbitMQ 中的每条消息确认
per message acknowledgement in Kafka / RabbitMQ
我们有一个可用的 rabbitmq .implementation,由于数量原因,我们计划切换到 kafka。
有一点我有疑问。
在 rabbitMQ 中,当消费者使用来自 Q 的消息时,消息进入不同的阶段,未确认阶段。 client/consumer 需要一些时间来处理消息,在成功处理后,它会向 Q 发送确认,并且消息会从 Q 中删除。如果不成功,在定义的时间后如果 Q 没有得到确认,则消息附加在 Q 的末尾。这样我们就不会丢失任何消息。
根据我对 Kafka 的了解,我知道如果例如消息 100 没有被成功处理,偏移量不会增加,但是如果消息 101 被成功处理它会增加。所以我丢了消息100.
有没有办法保证 none 的消息会丢失。
除非您轮询新消息,否则您的消息偏移量不会增加。因此,您必须担心重新处理您的消息。
如果要将数据处理的结果存储到Kafka集群中,可以使用transaction feature of Kafka。这样你就可以支持exactly once delivery。您的所有更改都将被保存,或者 none 将被存储。
另一种方法是使您的处理场景幂等。您将为 Kafka 中的每条消息分配一个唯一 ID。处理消息时,将 ID 存储在数据库中。崩溃后,您通过查看数据库检查您的消息 ID 是否已被处理。
Kafka 不会从主题中删除消息,除非它到达 log.retention.bytes
log.retention.hours
log.retention.minutes
log.retention.ms
配置之一。因此,如果 offset 增加,您不会丢失以前的消息,您可以简单地将 offset 更改为您想要的位置。
您应该稍微了解一下 Kafka 中消息消费的工作原理。这是官方 Kafka 文档的消费者部分的 link:https://kafka.apache.org/documentation/#theconsumer
基本上,在 Kafka 中,只有经过足够长的时间后才会删除消息,这是使用 log.retention.hours
、log.retention.minutes
和 log.retention.ms
配置的,就像@Amin 所说的那样。
在 Kafka 中,任何数量的消费者都可以随时开始消费来自任何主题的消息,而不管其他消费者是否已经在消费来自同一主题的消息。 Kafka 使用存储在 Kafka 本身中的偏移量跟踪每个消费者在每个 topic/partition 上的位置。因此,如果您的消费者需要使用消息 100,就像您在问题中描述的那样,您可以简单地 "rewind" 到所需的消息,然后再次开始正常使用。无论您之前是否使用过它,或者其他消费者是否正在阅读该主题,都没有关系。
来自 Kafka 官方文档:
A consumer can deliberately rewind back to an old offset and
re-consume data. This violates the common contract of a queue, but
turns out to be an essential feature for many consumers. For example,
if the consumer code has a bug and is discovered after some messages
are consumed, the consumer can re-consume those messages once the bug
is fixed.
我也遇到了同样的问题。如果我想用一种简单的方式表达,RabbitMQ 会计算每个
- 已发布但未消费
- 已发布、已使用和未确认的消息。
Kafka没有,所以你不能现成的,你必须自己实现它。
虽然有选项,使用kmq,性能会变得不到50%,看看
https://softwaremill.com/kafka-with-selective-acknowledgments-performance/
我们有一个可用的 rabbitmq .implementation,由于数量原因,我们计划切换到 kafka。
有一点我有疑问。
在 rabbitMQ 中,当消费者使用来自 Q 的消息时,消息进入不同的阶段,未确认阶段。 client/consumer 需要一些时间来处理消息,在成功处理后,它会向 Q 发送确认,并且消息会从 Q 中删除。如果不成功,在定义的时间后如果 Q 没有得到确认,则消息附加在 Q 的末尾。这样我们就不会丢失任何消息。
根据我对 Kafka 的了解,我知道如果例如消息 100 没有被成功处理,偏移量不会增加,但是如果消息 101 被成功处理它会增加。所以我丢了消息100.
有没有办法保证 none 的消息会丢失。
除非您轮询新消息,否则您的消息偏移量不会增加。因此,您必须担心重新处理您的消息。
如果要将数据处理的结果存储到Kafka集群中,可以使用transaction feature of Kafka。这样你就可以支持exactly once delivery。您的所有更改都将被保存,或者 none 将被存储。
另一种方法是使您的处理场景幂等。您将为 Kafka 中的每条消息分配一个唯一 ID。处理消息时,将 ID 存储在数据库中。崩溃后,您通过查看数据库检查您的消息 ID 是否已被处理。
Kafka 不会从主题中删除消息,除非它到达 log.retention.bytes
log.retention.hours
log.retention.minutes
log.retention.ms
配置之一。因此,如果 offset 增加,您不会丢失以前的消息,您可以简单地将 offset 更改为您想要的位置。
您应该稍微了解一下 Kafka 中消息消费的工作原理。这是官方 Kafka 文档的消费者部分的 link:https://kafka.apache.org/documentation/#theconsumer
基本上,在 Kafka 中,只有经过足够长的时间后才会删除消息,这是使用 log.retention.hours
、log.retention.minutes
和 log.retention.ms
配置的,就像@Amin 所说的那样。
在 Kafka 中,任何数量的消费者都可以随时开始消费来自任何主题的消息,而不管其他消费者是否已经在消费来自同一主题的消息。 Kafka 使用存储在 Kafka 本身中的偏移量跟踪每个消费者在每个 topic/partition 上的位置。因此,如果您的消费者需要使用消息 100,就像您在问题中描述的那样,您可以简单地 "rewind" 到所需的消息,然后再次开始正常使用。无论您之前是否使用过它,或者其他消费者是否正在阅读该主题,都没有关系。
来自 Kafka 官方文档:
A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
我也遇到了同样的问题。如果我想用一种简单的方式表达,RabbitMQ 会计算每个
- 已发布但未消费
- 已发布、已使用和未确认的消息。
Kafka没有,所以你不能现成的,你必须自己实现它。
虽然有选项,使用kmq,性能会变得不到50%,看看
https://softwaremill.com/kafka-with-selective-acknowledgments-performance/