Spring kafka消费者,在运行时寻找偏移量?
Spring kafka consumer, seek offset at runtime?
我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每条记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功。为此,我需要对最后一个偏移量进行 运行 时间手动查找。
KafkaMessageListenerContainer 是否可行?
参见Seeking to a Specific Offset。
In order to seek, your listener must implement ConsumerSeekAware
which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer
) you should store the callback in a ThreadLocal
or some other structure keyed by the listener Thread.
please refer to Spring Kafka document
“对于现有组 ID,初始偏移量是该组 ID 的当前偏移量”。
Confluent 还提到:
“如果消费者崩溃或关闭,其分区将被重新分配给另一个成员,该成员将从每个分区的最后提交的偏移量开始消费”
这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您通过
手动提交,您也不需要“寻找失败的偏移量”
ENABLE_AUTO_COMMIT_CONFIG = False
和
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)
我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每条记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功。为此,我需要对最后一个偏移量进行 运行 时间手动查找。
KafkaMessageListenerContainer 是否可行?
参见Seeking to a Specific Offset。
In order to seek, your listener must implement
ConsumerSeekAware
which has the following methods:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a
ConcurrentMessageListenerContainer
) you should store the callback in aThreadLocal
or some other structure keyed by the listener Thread.
please refer to Spring Kafka document
“对于现有组 ID,初始偏移量是该组 ID 的当前偏移量”。
Confluent 还提到: “如果消费者崩溃或关闭,其分区将被重新分配给另一个成员,该成员将从每个分区的最后提交的偏移量开始消费”
这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您通过
手动提交,您也不需要“寻找失败的偏移量”ENABLE_AUTO_COMMIT_CONFIG = False
和
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)