Spring Kafka 多主题一个 class 动态

Spring Kafka multiple topic for one class dynamically

我最近想在我的项目中添加一个使用 spring-kafka 的新行为。

这个想法真的很简单:

目标是动态创建新主题和消费者。我不能使用 spring kafka 注释,因为我需要它是动态的,所以我这样做了:

    @KafkaListener(topics = ScenarioTopics.NEW_SCENARIO)
    public void receive(final String topic) {
        logger.info("Get new scenario " + topic + ", creating new consumer");

        TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(
                "APP2_" + topic, 1, 0L);

        ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
        containerProps.setMessageListener((MessageListener<Object, String>) message -> {
            // process my message
        });

        KafkaMessageListenerContainer<Object, String> container = new KafkaMessageListenerContainer<>(kafkaPeopleConsumerFactory, containerProps);
        container.start();
    }

这是行不通的。我可能遗漏了什么,但我不知道是什么。

这里有一些日志告诉我领导者不可用,这很奇怪,因为我得到了新的场景事件。



        2022-03-14 18:08:26.057  INFO 21892 --- [ntainer#0-0-C-1] o.l.b.v.c.c.i.k.KafkaScenarioListener    : Get new scenario W4BdDBEowY, creating new consumer
        2022-03-14 18:08:26.061  INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        allow.auto.create.topics = true
        [...lot of things...]
        value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

        2022-03-14 18:08:26.067  INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
        2022-03-14 18:08:26.067  INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
        2022-03-14 18:08:26.067  INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1647277706067
        2022-03-14 18:08:26.068  INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Subscribed to partition(s): PEOPLE_W4BdDBEowY-1
        2022-03-14 18:08:26.072  INFO 21892 --- [           -C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Seeking to offset 0 for partition PEOPLE_W4BdDBEowY-1
        2022-03-14 18:08:26.081  WARN 21892 --- [           -C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 2 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
        2022-03-14 18:08:26.081  INFO 21892 --- [           -C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Cluster ID: ebyKy-RVSRmUDaaeQqMaQg
        2022-03-14 18:18:04.882  WARN 21892 --- [           -C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5314 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
        2022-03-14 18:18:04.997  WARN 21892 --- [           -C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5315 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}

如何动态创建主题上的 kafka 消费者?我觉得我做的很不对,但是我找了很多,真的什么都没有找到。

这里有几个关于动态创建容器的答案...

Create consumer dynamically spring kafka

Dynamically start and off KafkaListener just to load previous messages at the start of a session