Spring @KafkaListener with topicPattern:处理运行时主题创建
Spring @KafkaListener with topicPattern: handle runtime topic creation
我正在使用 Spring @KafkaListener 和 topicPattern。如果在此应用程序运行期间我创建了一个与模式匹配的新主题并开始发布到该主题,则侦听器应用程序将忽略这些消息。换句话说,它只在启动时拉取所有与模式匹配的主题并收听这些主题。
“刷新”它的最简单方法是什么?谢谢!
我认为这就是设计的方式。 Kafka 客户端必须先订阅一个主题才能获取消息。
在这种情况下,Kafka client/consumer 在启动时订阅主题匹配模式一次,这就是它所进行的。
但这真是一个有趣的问题。最简单最简单的答案是“重新启动 client/consumer”。但是,会留意其他人的回答以了解任何想法。
默认情况下,根据https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
的设置,会在5分钟内(默认)提取新主题
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
您可以减少它以加快速度,但会增加流量。
编辑
这表明它按预期工作...
@SpringBootApplication
public class So71386069Application {
private static final Logger log = LoggerFactory.getLogger(So71386069Application.class);
public static void main(String[] args) {
SpringApplication.run(So71386069Application.class, args);
}
@KafkaListener(id = "so71386069", topicPattern = "so71386069.*",
properties = "metadata.max.age.ms:60000")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71386069").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
IntStream.range(0, 10).forEach(i -> {
try {
Thread.sleep(30_000);
String topic = "so71386069-" + i;
log.info("Creating {}", topic);
client.createTopics(Collections.singleton(
TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
e.printStackTrace();
}
});
}
};
}
}
2022-03-07 15:41:07.131 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0]
2022-03-07 15:41:34.007 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-0
2022-03-07 15:42:04.193 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-1
...
2022-03-07 15:42:07.590 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0]
...
2022-03-07 15:42:07.599 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-1-0, so71386069-0-0]
2022-03-07 15:42:34.378 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-2
2022-03-07 15:43:04.554 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-3
...
2022-03-07 15:43:08.403 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0, so71386069-1-0, so71386069-0-0]
...
2022-03-07 15:43:08.411 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-3-0, so71386069-2-0, so71386069-1-0, so71386069-0-0]
...
我正在使用 Spring @KafkaListener 和 topicPattern。如果在此应用程序运行期间我创建了一个与模式匹配的新主题并开始发布到该主题,则侦听器应用程序将忽略这些消息。换句话说,它只在启动时拉取所有与模式匹配的主题并收听这些主题。
“刷新”它的最简单方法是什么?谢谢!
我认为这就是设计的方式。 Kafka 客户端必须先订阅一个主题才能获取消息。
在这种情况下,Kafka client/consumer 在启动时订阅主题匹配模式一次,这就是它所进行的。
但这真是一个有趣的问题。最简单最简单的答案是“重新启动 client/consumer”。但是,会留意其他人的回答以了解任何想法。
默认情况下,根据https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
的设置,会在5分钟内(默认)提取新主题The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
您可以减少它以加快速度,但会增加流量。
编辑
这表明它按预期工作...
@SpringBootApplication
public class So71386069Application {
private static final Logger log = LoggerFactory.getLogger(So71386069Application.class);
public static void main(String[] args) {
SpringApplication.run(So71386069Application.class, args);
}
@KafkaListener(id = "so71386069", topicPattern = "so71386069.*",
properties = "metadata.max.age.ms:60000")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71386069").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
IntStream.range(0, 10).forEach(i -> {
try {
Thread.sleep(30_000);
String topic = "so71386069-" + i;
log.info("Creating {}", topic);
client.createTopics(Collections.singleton(
TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
e.printStackTrace();
}
});
}
};
}
}
2022-03-07 15:41:07.131 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0]
2022-03-07 15:41:34.007 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-0
2022-03-07 15:42:04.193 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-1
...
2022-03-07 15:42:07.590 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0]
...
2022-03-07 15:42:07.599 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-1-0, so71386069-0-0]
2022-03-07 15:42:34.378 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-2
2022-03-07 15:43:04.554 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-3
...
2022-03-07 15:43:08.403 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0, so71386069-1-0, so71386069-0-0]
...
2022-03-07 15:43:08.411 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-3-0, so71386069-2-0, so71386069-1-0, so71386069-0-0]
...