整合kafka Consumer spring批次

Integrate kafka Consumer spring batch

我有一个在 spring-boot 中开发的 Kafka 消费者,我能够从主题中读取消息。 我想将它与 Spring 批处理集成,因为我想创建一个批处理文件。 我不知道该怎么做。

Spring 批量添加对 read/write 数据的支持 from/to Kafka 主题 v4.2, see KafkaItemReader and KafkaItemWriter.

您还可以查看 Spring Tips installment Josh Long 在 Spring Batch 中关于 Kafka 支持的内容。

尝试如下:

private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
    LOG.info("beginning to consume batch messages");

    for (int i = 0; i < messages.size(); i++) {

        LOG.info("received message='{}' with partition-offset='{}'",
                messages.get(i), partitions.get(i) + "-" + offsets.get(i));

    }
    LOG.info("all batch messages consumed");
}



 @EnableKafka
 @Configuration
 public class ListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
    return factory;
}

}

参考:https://memorynotfound.com/spring-kafka-batch-listener-example/