在 spring kafka 中为心跳使用侦听器容器

Use listener container for heartbeats in spring kafka

我正在尝试设置容器来监听心跳主题 - 我只对监听器容器空闲时感兴趣 - 是否可以配置容器而不使用 @KafkaListener 而只使用 EventListener 来监听空闲事件?是否可以在空闲事件侦听器中设置搜索以获取主题的最后一条消息以获取时间和其他详细信息等上下文信息?

是否有一个事件可以知道消费者在空闲事件后开始消费而不需要使用@KafkaListener?

没有消费重启事件

不清楚您所说的“无需使用 @KafkaListener”是什么意思;您可以创建自己的侦听器...

@SpringBootApplication
public class So63957429Application {

    public static void main(String[] args) {
        SpringApplication.run(So63957429Application.class, args);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {

        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
        container.getContainerProperties().setMessageListener(myListener);
        return container;
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63957429").partitions(1).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so63957429", "foo");
    }

}

@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(ListenerUtils.recordToString(record, true));
    }

    @EventListener
    public void listen(ListenerContainerIdleEvent event) {
        System.out.println("Idle; rewinding to last");
        getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so63957429
spring.kafka.listener.idle-event-interval=5000
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:54.408  INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0
Idle; rewinding to last
2020-09-18 10:50:59.436  INFO 17195 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63957429-1, groupId=so63957429] Seeking to offset 0 for partition so63957429-0
so63957429-0@0

编辑

如果需要访问最后消费的记录,可以添加RecordInterceptor:


    @Bean
    public ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> factory, MyListener myListener) {

        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so63957429");
        container.getContainerProperties().setMessageListener(myListener);
        container.setRecordInterceptor(myListener);
        return container;
    }

@Component
class MyListener extends AbstractConsumerSeekAware implements MessageListener<String, String>,
        RecordInterceptor<String, String> {

    private ConsumerRecord<String, String> lastRecord;

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(ListenerUtils.recordToString(record, true));
    }

    @EventListener
    public void listen(ListenerContainerIdleEvent event) {
        System.out.println(event);
        System.out.println("Last Record: " + ListenerUtils.recordToString(this.lastRecord, true));
//      getSeekCallbacks().forEach((tp, callback) -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    @EventListener
    public void listen(ListenerContainerNoLongerIdleEvent event) {
        System.out.println(event);
    }

    @Override
    @Nullable
    public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
        return this.lastRecord = record;
    }

}

如果并发> 1,则需要将最后一条记录存储在 ThreadLocal