Spring Kafka 多主题一个 class 动态
Spring Kafka multiple topic for one class dynamically
我最近想在我的项目中添加一个使用 spring-kafka 的新行为。
这个想法真的很简单:
- App1 创建一个新的场景名称“SCENARIO_1”并在主题“NEW_SCENARIO”中发布此字符串“
”
- App1 发布了一些关于主题“APP2-SCENARIO_1”和“APP3-SCENARIO_1”的消息
- App2 (group-id=app2) 侦听 NEW_SCENARIO 并创建一个新的消费者
- App3 (group-id=app3) 侦听 NEW_SCENARIO 并创建一个新的消费者
目标是动态创建新主题和消费者。我不能使用 spring kafka 注释,因为我需要它是动态的,所以我这样做了:
@KafkaListener(topics = ScenarioTopics.NEW_SCENARIO)
public void receive(final String topic) {
logger.info("Get new scenario " + topic + ", creating new consumer");
TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(
"APP2_" + topic, 1, 0L);
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
containerProps.setMessageListener((MessageListener<Object, String>) message -> {
// process my message
});
KafkaMessageListenerContainer<Object, String> container = new KafkaMessageListenerContainer<>(kafkaPeopleConsumerFactory, containerProps);
container.start();
}
这是行不通的。我可能遗漏了什么,但我不知道是什么。
这里有一些日志告诉我领导者不可用,这很奇怪,因为我得到了新的场景事件。
2022-03-14 18:08:26.057 INFO 21892 --- [ntainer#0-0-C-1] o.l.b.v.c.c.i.k.KafkaScenarioListener : Get new scenario W4BdDBEowY, creating new consumer
2022-03-14 18:08:26.061 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
[...lot of things...]
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1647277706067
2022-03-14 18:08:26.068 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Subscribed to partition(s): PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.072 INFO 21892 --- [ -C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Seeking to offset 0 for partition PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.081 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 2 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:08:26.081 INFO 21892 --- [ -C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Cluster ID: ebyKy-RVSRmUDaaeQqMaQg
2022-03-14 18:18:04.882 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5314 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:18:04.997 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5315 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
如何动态创建主题上的 kafka 消费者?我觉得我做的很不对,但是我找了很多,真的什么都没有找到。
这里有几个关于动态创建容器的答案...
Create consumer dynamically spring kafka
Dynamically start and off KafkaListener just to load previous messages at the start of a session
我最近想在我的项目中添加一个使用 spring-kafka 的新行为。
这个想法真的很简单:
- App1 创建一个新的场景名称“SCENARIO_1”并在主题“NEW_SCENARIO”中发布此字符串“ ”
- App1 发布了一些关于主题“APP2-SCENARIO_1”和“APP3-SCENARIO_1”的消息
- App2 (group-id=app2) 侦听 NEW_SCENARIO 并创建一个新的消费者
- App3 (group-id=app3) 侦听 NEW_SCENARIO 并创建一个新的消费者
目标是动态创建新主题和消费者。我不能使用 spring kafka 注释,因为我需要它是动态的,所以我这样做了:
@KafkaListener(topics = ScenarioTopics.NEW_SCENARIO)
public void receive(final String topic) {
logger.info("Get new scenario " + topic + ", creating new consumer");
TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(
"APP2_" + topic, 1, 0L);
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
containerProps.setMessageListener((MessageListener<Object, String>) message -> {
// process my message
});
KafkaMessageListenerContainer<Object, String> container = new KafkaMessageListenerContainer<>(kafkaPeopleConsumerFactory, containerProps);
container.start();
}
这是行不通的。我可能遗漏了什么,但我不知道是什么。
这里有一些日志告诉我领导者不可用,这很奇怪,因为我得到了新的场景事件。
2022-03-14 18:08:26.057 INFO 21892 --- [ntainer#0-0-C-1] o.l.b.v.c.c.i.k.KafkaScenarioListener : Get new scenario W4BdDBEowY, creating new consumer
2022-03-14 18:08:26.061 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
[...lot of things...]
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
2022-03-14 18:08:26.067 INFO 21892 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1647277706067
2022-03-14 18:08:26.068 INFO 21892 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Subscribed to partition(s): PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.072 INFO 21892 --- [ -C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Seeking to offset 0 for partition PEOPLE_W4BdDBEowY-1
2022-03-14 18:08:26.081 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 2 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:08:26.081 INFO 21892 --- [ -C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Cluster ID: ebyKy-RVSRmUDaaeQqMaQg
2022-03-14 18:18:04.882 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5314 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
2022-03-14 18:18:04.997 WARN 21892 --- [ -C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-people-creator-2, groupId=people-creator] Error while fetching metadata with correlation id 5315 : {PEOPLE_W4BdDBEowY=LEADER_NOT_AVAILABLE}
如何动态创建主题上的 kafka 消费者?我觉得我做的很不对,但是我找了很多,真的什么都没有找到。
这里有几个关于动态创建容器的答案...
Dynamically start and off KafkaListener just to load previous messages at the start of a session