什么是最简单的 Spring Kafka @KafkaListener 配置来消费一组压缩主题中的所有记录?

What is the simplest Spring Kafka @KafkaListener configuration to consume all records from a set of compacted topics?

我在 spring application.yaml 中定义了几个压缩的 Kafka 主题(topic1topic2、...、topicN)的名称文件。我希望能够在启动时使用每个主题分区 上的所有记录。每个主题的分区数事先不知道。

官方 Spring Kafka 2.6.1 文档建议最简单的方法是 implement a PartitionFinder and use it in a SpEL expresssion to dynamically look up the number of partitions for a topic, and to then use a * wildcard in the partitions attribute of a @TopicPartition annotation (see Explicit Partition Assignment in the @KafkaListener Annotation documentation):

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
            partitions = "#{@finder.partitions('compacted')}"),
            partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

由于我有几个主题,因此生成的代码非常冗长:

@KafkaListener(topicPartitions = {
        @TopicPartition(
                topic = "${topic1}",
                partitions = "#{@finder.partitions('${topic1}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        @TopicPartition(
                topic = "${topic2}",
                partitions = "#{@finder.partitions('${topic2}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        ),
        // and many more @TopicPartitions...
        @TopicPartition(
                topic = "${topicN}",
                partitions = "#{@finder.partitions('${topicN}')}",
                partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
        )
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
    // process record
}

如何通过使用动态生成的 @TopicPartion 数组配置 @KafkaListener 注释的 topicPartitions 属性(一个用于我的 N 个主题)?

目前无法使用 @KafkaListener - 请在 GitHub 上打开一个新功能问题。

我能想到的唯一解决方法是以编程方式从容器工厂创建侦听器容器并创建侦听器适配器。如果你需要的话,我可以提供一个例子。

编辑

这是一个例子:

@SpringBootApplication
public class So64022266Application {

    public static void main(String[] args) {
        SpringApplication.run(So64022266Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
    }

    @Bean
    ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
            PartitionFinder finder,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            MyListener listener) throws Exception {

        MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
        ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
        container.getContainerProperties().setGroupId("someGroup");
        return container;
    }

    @Bean
    MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
            MyListener listener) throws NoSuchMethodException {

        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBean(listener);
        endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
        endpoint.setTopicPartitions(Arrays.stream(topics)
            .flatMap(topic -> finder.partitions(topic))
            .toArray(TopicPartitionOffset[]::new));
        endpoint.setMessageHandlerMethodFactory(methodFactory());
        return endpoint;
    }

    @Bean
    DefaultMessageHandlerMethodFactory methodFactory() {
        return new DefaultMessageHandlerMethodFactory();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ConcurrentMessageListenerContainer<String, String> container) {

        return args -> {
            System.out.println(container.getAssignedPartitions());
            template.send("so64022266-1", "key1", "foo");
            template.send("so64022266-2", "key2", "bar");
        };
    }

}

@Component
class MyListener {

    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
        System.out.println(key + ":" + payload);
    }

}

@Component
class PartitionFinder {

    private final ConsumerFactory<String, String> consumerFactory;

    public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    public Stream<TopicPartitionOffset> partitions(String topic) {
        System.out.println("+" + topic + "+");
        try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
            return consumer.partitionsFor(topic).stream()
                    .map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
        }
    }

}
topics=so64022266-1, so64022266-2

如果您需要处理墓碑记录(null 值),我们需要增强处理程序工厂;我们目前不公开框架的处理程序工厂。