重新启动侦听器并从最新消息继续

Restart listener and continue from latest message

案例

  1. 客户端是 ReplyingKafkaTemplate 个实例。
  2. 服务器是 ConcurrentMessageListenerContainer 使用方法上的 @KafkaListener@SendTo 注释创建的。
  3. ContainerFactory 使用 ContainerStoppingErrorHandler.
  4. 请求主题只有 1 个分区。
  5. 组 ID 是静态的。例如。测试消费者组。
  6. 请求发送超时。
  7. 由于抛出异常,服务器宕机 但是客户端不断发送在队列中排队的请求 请求主题。

当前行为

当服务器重新启动时,它会继续处理可能会超时的旧请求。

期望的行为

相反,最好继续上一条消息;从而跳过甚至未处理的消息,因为相应的请求会超时并重试。

问题

  1. 实现此目标的推荐方法是什么?
  2. 据我所知,我似乎必须手动设置初始偏移量。实现它的最简单方法是什么?

您的 @KafkaListener class 必须 extends AbstractConsumerSeekAware 并执行如下操作:

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        super.onPartitionsAssigned(assignments, callback);
        callback.seekToEnd(assignments.keySet());
    }

因此,每次当您的消费者加入该组时,它都会寻找所有分配的分区,直到最后跳过所有旧记录。