Spring kafka消费者,在运行时寻找偏移量?

Spring kafka consumer, seek offset at runtime?

我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每条记录,这也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。

但是如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它直到它成功。为此,我需要对最后一个偏移量进行 运行 时间手动查找。

KafkaMessageListenerContainer 是否可行?

参见Seeking to a Specific Offset

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

please refer to Spring Kafka document

“对于现有组 ID,初始偏移量是该组 ID 的当前偏移量”。

Confluent 还提到: “如果消费者崩溃或关闭,其分区将被重新分配给另一个成员,该成员将从每个分区的最后提交的偏移量开始消费”

这意味着,在您的情况下,当消费者恢复时,它将在最后提交的偏移量处继续。如果您通过

手动提交,您也不需要“寻找失败的偏移量”

ENABLE_AUTO_COMMIT_CONFIG = False

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)