在 Spring Kafka 运行时更改监听主题
Change listening topic during runtime in Spring Kafka
我有一个前端,我可以在其中触发将我的 Kafka 主题更改为另一个主题。当我这样做时,Java Springboot 后端也应该 更改监听那个新主题 以使用传入消息。问题是这必须在 运行时 期间发生。因此 @KafkaListener
不是一个选项,因为它至少在启动时需要主题名称。
我将新主题作为 UUID 字符串传递给如下所示的方法。这是许多尝试之一,它不会识别新 uuid 主题中的任何消息(即使有消息)。新主题和消息由另一项服务生成(这部分工作正常)。我从另一个并没有真正帮助我的问题中得到这个例子: And I also read: How to create separate Kafka listener for each topic dynamically in springboot?
尽管如此,在应用程序启动和第一次调用 changeListener
方法期间,我在控制台中得到了这个日志行:
INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f
目标是在 Kafka 的新主题中每次有消息时 // do other stuff with message
调用 MessageListener 的方法。
是否可以在运行时更改主题?如果可以,如何更改?
如果您需要更多信息,请随时询问。
public void changeListener(String uuid) {
ContainerProperties containerProps = new ContainerProperties(uuid);
containerProps.setMessageListener(
(MessageListener<UUID, String>) message -> {
LOG.info("received: " + message);
// do other stuff with message
}
);
KafkaMessageListenerContainer<UUID, String> container =
new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
container.start();
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
像那样手动创建容器不是一个好主意,因为它需要 spring 进行初始化;您将无法获得全部功能。
如果您使用的是spring引导;使用其 ConcurrentMessageListenerContainerFactory
创建容器。
如果你没有使用引导,请添加你自己的 ConcurrentMessageListenerContainerFactory
@Bean
。
对多个侦听器容器使用相同的 group.id
也不是一个好主意,因为对一个容器进行重新平衡会导致对其他容器进行不必要的重新平衡。
为了读取新主题中的现有记录,您必须设置 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="earliest:"
(默认为 latest
)。
据我所知,通过源代码,您无法在运行时更改主题。因此,您需要停止当前容器并重新创建一个新容器。
在这种情况下,我建议不要使用注册表并自行管理容器,因为您似乎无法从注册表中删除容器,最终会导致内存泄漏。
您可以自己自动装配 KafkaListenerContainerFactory
。这个工厂需要一个端点。我必须承认,如果您只想更改主题并调用回调,那么设置端点对我来说似乎有点痛苦,因为所有可用的实现都使用带有 bean 和方法引用的元编程。
下面的片段应该可以帮助您入门,尽管它可能需要更多调整。
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
private KafkaListenerContainerFactory<?> factory;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
public void setFactory(KafkaListenerContainerFactory<?> factory) {
this.factory = factory;
}
@EventListener(classes = {ApplicationStartedEvent.class})
public void onStarted() throws InterruptedException, NoSuchMethodException {
var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
registry.stop();
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
}
private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
listenerEndpoint.setBean(this);
listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
listenerEndpoint.setTopics(topic);
return listenerEndpoint;
}
public void onMessage(String key, String value) {
System.out.println(key + ":" + value)
}
}
附带说明一下,如果您想访问注册表,您可以实施 KafkaListenerConfigurer,因为它不是自动装配的。但同样,如果您想终止容器,请不要使用它,因为据我所知,您无法删除引用。
我有一个前端,我可以在其中触发将我的 Kafka 主题更改为另一个主题。当我这样做时,Java Springboot 后端也应该 更改监听那个新主题 以使用传入消息。问题是这必须在 运行时 期间发生。因此 @KafkaListener
不是一个选项,因为它至少在启动时需要主题名称。
我将新主题作为 UUID 字符串传递给如下所示的方法。这是许多尝试之一,它不会识别新 uuid 主题中的任何消息(即使有消息)。新主题和消息由另一项服务生成(这部分工作正常)。我从另一个并没有真正帮助我的问题中得到这个例子:
尽管如此,在应用程序启动和第一次调用 changeListener
方法期间,我在控制台中得到了这个日志行:
INFO 9636 --- [main] o.a.k.clients.consumer.KafkaConsumer: [Consumer clientId=consumer-group-1, groupId=group] Subscribed to topic(s): 09574388-e8e1-4cef-8e67-881f69850f8f
目标是在 Kafka 的新主题中每次有消息时 // do other stuff with message
调用 MessageListener 的方法。
是否可以在运行时更改主题?如果可以,如何更改?
如果您需要更多信息,请随时询问。
public void changeListener(String uuid) {
ContainerProperties containerProps = new ContainerProperties(uuid);
containerProps.setMessageListener(
(MessageListener<UUID, String>) message -> {
LOG.info("received: " + message);
// do other stuff with message
}
);
KafkaMessageListenerContainer<UUID, String> container =
new KafkaMessageListenerContainer<>(new DefaultKafkaConsumerFactory<>(consumerProps()), containerProps);
container.start();
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8069");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
像那样手动创建容器不是一个好主意,因为它需要 spring 进行初始化;您将无法获得全部功能。
如果您使用的是spring引导;使用其 ConcurrentMessageListenerContainerFactory
创建容器。
如果你没有使用引导,请添加你自己的 ConcurrentMessageListenerContainerFactory
@Bean
。
对多个侦听器容器使用相同的 group.id
也不是一个好主意,因为对一个容器进行重新平衡会导致对其他容器进行不必要的重新平衡。
为了读取新主题中的现有记录,您必须设置 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="earliest:"
(默认为 latest
)。
据我所知,通过源代码,您无法在运行时更改主题。因此,您需要停止当前容器并重新创建一个新容器。
在这种情况下,我建议不要使用注册表并自行管理容器,因为您似乎无法从注册表中删除容器,最终会导致内存泄漏。
您可以自己自动装配 KafkaListenerContainerFactory
。这个工厂需要一个端点。我必须承认,如果您只想更改主题并调用回调,那么设置端点对我来说似乎有点痛苦,因为所有可用的实现都使用带有 bean 和方法引用的元编程。
下面的片段应该可以帮助您入门,尽管它可能需要更多调整。
@SpringBootApplication
@EnableKafka
public class KafkaDemoApplication {
private KafkaListenerContainerFactory<?> factory;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Autowired
public void setFactory(KafkaListenerContainerFactory<?> factory) {
this.factory = factory;
}
@EventListener(classes = {ApplicationStartedEvent.class})
public void onStarted() throws InterruptedException, NoSuchMethodException {
var listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_3"));
registry.stop();
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
listenerContainer = factory.createListenerContainer(getEndpoint("my_topic_4"));
listenerContainer.start();
Thread.sleep(2000);
listenerContainer.stop();
}
private KafkaListenerEndpoint getEndpoint(String topic) throws NoSuchMethodException {
var listenerEndpoint = new MethodKafkaListenerEndpoint<String, String>();
listenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
listenerEndpoint.setBean(this);
listenerEndpoint.setMethod(getClass().getMethod("onMessage", String.class, String.class));
listenerEndpoint.setTopics(topic);
return listenerEndpoint;
}
public void onMessage(String key, String value) {
System.out.println(key + ":" + value)
}
}
附带说明一下,如果您想访问注册表,您可以实施 KafkaListenerConfigurer,因为它不是自动装配的。但同样,如果您想终止容器,请不要使用它,因为据我所知,您无法删除引用。