我可以在运行时向我的@kafkalistener 添加主题吗
Can i add topics to my @kafkalistener at runtime
我已经为主题数组创建了一个 bean,并且在运行时我向这个主题数组添加了一些主题,但是消费者没有更新主题并且仍然从主题数组中的第一个主题中消费。我希望消费者添加这些新主题并开始使用它们
@Autowired
private String[] topicArray;
@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
public void listen(...) {
...
}
否; 属性 在初始化期间计算一次。
您不能在运行时向现有侦听器容器添加主题。
但是,您可以将侦听器 bean 设为原型 bean,并在每次您想收听新主题时创建一个新容器。
这是一个例子:
@SpringBootApplication
public class So68744775Application {
public static void main(String[] args) {
SpringApplication.run(So68744775Application.class, args);
}
private String[] topics;
private final AtomicInteger count = new AtomicInteger();
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Foo foo() {
return new Foo();
}
@Bean
Supplier<String> idProvider() {
return () -> "so68744775-" + count.getAndIncrement();
}
@Bean
Supplier<String[]> topicProvider() {
return () -> this.topics;
}
@Bean
ApplicationRunner runner(ApplicationContext context) {
return args -> {
this.topics = new String[] { "topic1", "topic2" };
context.getBean(Foo.class);
this.topics = new String[] { "topic3" };
context.getBean(Foo.class);
};
}
}
class Foo {
@KafkaListener(id = "#{idProvider.get()}", topics = "#{topicProvider.get()}", groupId = "grp")
public void listen(String in) {
System.out.println(in);
}
}
但是,最好省略 groupId
,这样每个容器都在自己的组中(id
属性)。这避免了在添加新容器时不必要的重新平衡。
我已经为主题数组创建了一个 bean,并且在运行时我向这个主题数组添加了一些主题,但是消费者没有更新主题并且仍然从主题数组中的第一个主题中消费。我希望消费者添加这些新主题并开始使用它们
@Autowired
private String[] topicArray;
@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
public void listen(...) {
...
}
否; 属性 在初始化期间计算一次。
您不能在运行时向现有侦听器容器添加主题。
但是,您可以将侦听器 bean 设为原型 bean,并在每次您想收听新主题时创建一个新容器。
这是一个例子:
@SpringBootApplication
public class So68744775Application {
public static void main(String[] args) {
SpringApplication.run(So68744775Application.class, args);
}
private String[] topics;
private final AtomicInteger count = new AtomicInteger();
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Foo foo() {
return new Foo();
}
@Bean
Supplier<String> idProvider() {
return () -> "so68744775-" + count.getAndIncrement();
}
@Bean
Supplier<String[]> topicProvider() {
return () -> this.topics;
}
@Bean
ApplicationRunner runner(ApplicationContext context) {
return args -> {
this.topics = new String[] { "topic1", "topic2" };
context.getBean(Foo.class);
this.topics = new String[] { "topic3" };
context.getBean(Foo.class);
};
}
}
class Foo {
@KafkaListener(id = "#{idProvider.get()}", topics = "#{topicProvider.get()}", groupId = "grp")
public void listen(String in) {
System.out.println(in);
}
}
但是,最好省略 groupId
,这样每个容器都在自己的组中(id
属性)。这避免了在添加新容器时不必要的重新平衡。