如果 Kafka 的消费者处理一条消息的时间过长怎么办? Kafka 会不会将这个分区重新分配给另一个消费者并且消息会被双重处理?
What if a Kafka's consumer handles a message too long? Will Kafka reappoint this partition to another consumer and the message will doubly handled?
假设Kafka
,1 partition
,2 consumers
。(第二个消费者空闲)
假设第一个消费了一条消息,去处理它与其他 3 个服务,突然粘在其中一个上,错过了 Kafka 的超时。
Kafka会不会将分区重新分配给第二个消费者并且消息会被双重处理(假设第一个最终成功)?
What if a Kafka's consumer handles a message too long? Will Kafka reappoint this partition to another consumer and the message will doubly handled?
是的,没错。如果 Kafka consumer 处理一个消息的时间太长,后续的 poll() 被延迟,Kafka 会重新指定这个 partition 给另一个 consumer,消息会被再次(又)处理。
为了更清楚,首先我们需要决定并定义'How long is too long?'。
这是由 属性 max.poll.interval.ms
定义的。来自docs,
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
如果在这段时间内没有调用 poll(),则重新平衡消费者组。
还有一个属性auto.commit.interval.ms
。自动提交偏移量检查将仅在轮询期间调用 - 它检查经过的时间是否大于配置的自动提交间隔时间,如果结果为是,则提交偏移量。
如果 Kafka 消费者处理记录的时间太长,则后续的 poll() 调用也会延迟,并且不会提交最后一个 poll() 返回的偏移量。如果此时发生rebalance,分配给这个partition的新consumer client会重新开始处理消息。
可以通过增加此值来避免消费者组重新平衡和由此产生的分区重新分配。这将增加轮询之间的允许间隔,并为消费者提供更多时间来处理从 poll() 返回的记录。消费者只会在轮询调用中加入再平衡,因此增加最大轮询间隔也会延迟组再平衡。
将最大轮询间隔增加到一个大值还有一个问题。如果消费者因其他原因死亡,则检测故障所需的时间比配置的 max.poll.interval.ms
间隔更长。
session.timeout.ms
和 heartbeat.interval.ms
在这种情况下可用于尽早检测到总故障。
有关这些参数的更多详细信息:
- 请参考
- KIP-62
请注意,为 session.timeout.ms
配置的值必须在 properties
代理配置中配置的允许范围内
- group.min.session.timeout.ms
- group.max.session.timeout.ms
否则,启动消费者客户端时会抛出以下异常。
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
更新:避免再次处理消息
KafkaConsumer class commitAsync()
中还有一个方法可以触发commit offsets操作
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
有关 commitSync() 和 commitAsync() 的更多详细信息,请查看 this thread
手动提交offset是表示已经处理了offset,这样Kafka就不会再发送同一个分区的committed记录了。手动提交偏移量时,请务必注意,如果消费者因任何原因在处理记录之前死亡,则有可能不会再次处理这些记录。
假设Kafka
,1 partition
,2 consumers
。(第二个消费者空闲)
假设第一个消费了一条消息,去处理它与其他 3 个服务,突然粘在其中一个上,错过了 Kafka 的超时。
Kafka会不会将分区重新分配给第二个消费者并且消息会被双重处理(假设第一个最终成功)?
What if a Kafka's consumer handles a message too long? Will Kafka reappoint this partition to another consumer and the message will doubly handled?
是的,没错。如果 Kafka consumer 处理一个消息的时间太长,后续的 poll() 被延迟,Kafka 会重新指定这个 partition 给另一个 consumer,消息会被再次(又)处理。
为了更清楚,首先我们需要决定并定义'How long is too long?'。
这是由 属性 max.poll.interval.ms
定义的。来自docs,
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
如果在这段时间内没有调用 poll(),则重新平衡消费者组。
还有一个属性auto.commit.interval.ms
。自动提交偏移量检查将仅在轮询期间调用 - 它检查经过的时间是否大于配置的自动提交间隔时间,如果结果为是,则提交偏移量。
如果 Kafka 消费者处理记录的时间太长,则后续的 poll() 调用也会延迟,并且不会提交最后一个 poll() 返回的偏移量。如果此时发生rebalance,分配给这个partition的新consumer client会重新开始处理消息。
可以通过增加此值来避免消费者组重新平衡和由此产生的分区重新分配。这将增加轮询之间的允许间隔,并为消费者提供更多时间来处理从 poll() 返回的记录。消费者只会在轮询调用中加入再平衡,因此增加最大轮询间隔也会延迟组再平衡。
将最大轮询间隔增加到一个大值还有一个问题。如果消费者因其他原因死亡,则检测故障所需的时间比配置的 max.poll.interval.ms
间隔更长。
session.timeout.ms
和 heartbeat.interval.ms
在这种情况下可用于尽早检测到总故障。
有关这些参数的更多详细信息:
- 请参考
- KIP-62
请注意,为 session.timeout.ms
配置的值必须在 properties
- group.min.session.timeout.ms
- group.max.session.timeout.ms
否则,启动消费者客户端时会抛出以下异常。
Exception in thread "main" org.apache.kafka.common.errors.InvalidSessionTimeoutException:
The session timeout is not within the range allowed by the broker
(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms)
更新:避免再次处理消息
KafkaConsumer class commitAsync()
中还有一个方法可以触发commit offsets操作
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
kafkaConsumer.commitAsync();
有关 commitSync() 和 commitAsync() 的更多详细信息,请查看 this thread
手动提交offset是表示已经处理了offset,这样Kafka就不会再发送同一个分区的committed记录了。手动提交偏移量时,请务必注意,如果消费者因任何原因在处理记录之前死亡,则有可能不会再次处理这些记录。