重新启动侦听器并从最新消息继续
Restart listener and continue from latest message
案例
- 客户端是
ReplyingKafkaTemplate
个实例。
- 服务器是
ConcurrentMessageListenerContainer
使用方法上的 @KafkaListener
和 @SendTo
注释创建的。
- ContainerFactory 使用
ContainerStoppingErrorHandler
.
- 请求主题只有 1 个分区。
- 组 ID 是静态的。例如。测试消费者组。
- 请求发送超时。
- 由于抛出异常,服务器宕机
但是客户端不断发送在队列中排队的请求
请求主题。
当前行为
当服务器重新启动时,它会继续处理可能会超时的旧请求。
期望的行为
相反,最好继续上一条消息;从而跳过甚至未处理的消息,因为相应的请求会超时并重试。
问题
- 实现此目标的推荐方法是什么?
- 据我所知,我似乎必须手动设置初始偏移量。实现它的最简单方法是什么?
您的 @KafkaListener
class 必须 extends AbstractConsumerSeekAware
并执行如下操作:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
callback.seekToEnd(assignments.keySet());
}
因此,每次当您的消费者加入该组时,它都会寻找所有分配的分区,直到最后跳过所有旧记录。
案例
- 客户端是
ReplyingKafkaTemplate
个实例。 - 服务器是
ConcurrentMessageListenerContainer
使用方法上的@KafkaListener
和@SendTo
注释创建的。 - ContainerFactory 使用
ContainerStoppingErrorHandler
. - 请求主题只有 1 个分区。
- 组 ID 是静态的。例如。测试消费者组。
- 请求发送超时。
- 由于抛出异常,服务器宕机 但是客户端不断发送在队列中排队的请求 请求主题。
当前行为
当服务器重新启动时,它会继续处理可能会超时的旧请求。
期望的行为
相反,最好继续上一条消息;从而跳过甚至未处理的消息,因为相应的请求会超时并重试。
问题
- 实现此目标的推荐方法是什么?
- 据我所知,我似乎必须手动设置初始偏移量。实现它的最简单方法是什么?
您的 @KafkaListener
class 必须 extends AbstractConsumerSeekAware
并执行如下操作:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
callback.seekToEnd(assignments.keySet());
}
因此,每次当您的消费者加入该组时,它都会寻找所有分配的分区,直到最后跳过所有旧记录。