重消费Kafka消息的可能原因
Possible Reasons of Reconsuming Kafka Messages
昨天从日志中发现,kafka group coordinator发起group rebalance后,kafka重新消费了一些消息。这些消息已在两天前使用(从日志中确认)。
日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次reblancing会导致重新消费消息呢?有什么问题?
我使用的是golang kafka客户端。这是代码
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
并且我们在声明消息之前处理消息,所以我们似乎正在为 kafka 使用“至少发送一次”策略。我们一台机器上三个broker,另一台机器上只有一个消费者线程(go routine)。
对这种现象有什么解释吗?
我想消息肯定已经提交了,因为它们是两天前被消费的,否则为什么 kafka 会保留偏移量超过两天而不提交?
使用代码示例:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
已添加:
应用重启后重新平衡。还有另外两次重启没有导致重新启动
kafka 的配置
log.retention.check.interval.ms=300000
log.retention.hours=168
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable = 真
auto.create.topics.enable=假
通过阅读golang saram client和kafka server的源码,最终找到原因如下
Consumer group offset保留时间为24hours,这是kafka的默认设置,而log 保留是我们明确设置的 7 天 。
我的服务端app是运行测试环境,很少人可以访问,也就是说kafka producer生产的消息可能很少,然后consumer组消费的消息也很少,因此消费者可能不会长时间提交任何偏移量。
当消费偏移量超过24小时未更新时,由于偏移量配置,kafka broker/coordinator将从分区中删除消费偏移量。下次 saram 从 kafka broker 查询偏移量在哪里时,客户端当然什么也得不到。注意我们使用 sarama.OffsetOldest 作为初始值,那么 sarama client 将从 kafka broker 保存的消息开始消费消息,这导致消息重新消费,这很可能发生是因为日志保留期 7 天
昨天从日志中发现,kafka group coordinator发起group rebalance后,kafka重新消费了一些消息。这些消息已在两天前使用(从日志中确认)。
日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次reblancing会导致重新消费消息呢?有什么问题?
我使用的是golang kafka客户端。这是代码
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
并且我们在声明消息之前处理消息,所以我们似乎正在为 kafka 使用“至少发送一次”策略。我们一台机器上三个broker,另一台机器上只有一个消费者线程(go routine)。
对这种现象有什么解释吗? 我想消息肯定已经提交了,因为它们是两天前被消费的,否则为什么 kafka 会保留偏移量超过两天而不提交?
使用代码示例:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
已添加:
应用重启后重新平衡。还有另外两次重启没有导致重新启动
kafka 的配置
log.retention.check.interval.ms=300000
log.retention.hours=168
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable = 真
auto.create.topics.enable=假
通过阅读golang saram client和kafka server的源码,最终找到原因如下
Consumer group offset保留时间为24hours,这是kafka的默认设置,而log 保留是我们明确设置的 7 天 。
我的服务端app是运行测试环境,很少人可以访问,也就是说kafka producer生产的消息可能很少,然后consumer组消费的消息也很少,因此消费者可能不会长时间提交任何偏移量。
当消费偏移量超过24小时未更新时,由于偏移量配置,kafka broker/coordinator将从分区中删除消费偏移量。下次 saram 从 kafka broker 查询偏移量在哪里时,客户端当然什么也得不到。注意我们使用 sarama.OffsetOldest 作为初始值,那么 sarama client 将从 kafka broker 保存的消息开始消费消息,这导致消息重新消费,这很可能发生是因为日志保留期 7 天