是否有读取 Kafka 主题的 Spring Schedule 示例?

Is there any example of Spring Schedule that reads Kafka topic?

我们试图在指定的 window 时间从 Kafka 读取数据(因此我们有 Kafka 消费者),这意味着避免在其他时间读取数据。但是,我们不确定在时间段到期后如何关闭消费者。我想知道是否有任何例子说明如何做到这一点?非常感谢您帮助我们。

您可以禁用 autoStartup 然后使用 KafkaListenerEndpointRegistry startstop 方法手动启动 kafka containers @KafkaListener Lifecycle Management

public class KafkaConsumer {

 @Autowired
 private KafkaListenerEndpointRegistry registry;

  @KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
  public void listen(...) { ... }

  @Schedule(cron = "")
  public void scheduledMethod() {

   registry.start();

   registry.stop()
  }

但是在上面的方法中,不能保证所有来自kafka的消息都会在那个时间范围内被消费(这取决于负载和处理速度)

我有相同的用例,但我编写了调度程序来指定一批的最大轮询记录,并保留一个计数器,如果计数器与最大轮询记录匹配,那么我认为该批次的处理已经完成处理了它在一次投票中获得的记录。

然后我取消订阅该主题并关闭消费者。下次调度程序 运行 时,它将再次处理指定限制的最大轮询记录。

fixedDelayString 实现调度程序在指定时间限制后启动的目的,一旦前一个完成。

@EnableScheduling
public class MessageScheduler{

        @Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
        public void run(){

            /*write your kafka consumer here with manual commit*/

            /*once your batch is finished processing unsubcribe and close the consumer*/
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
        }

}