Spring kafka 在运行时重新创建 Kafka Stream Topology
Spring kafka re-create Kafka Stream Topology in runtime
我有一个基于 spring 引导、spring-kafka 和 kafka-streams 的应用程序。
当应用程序启动时,它会创建带有默认主题列表的 kafka 流拓扑。
我需要做的是在 运行 时间内完成 edit/recreate 拓扑。例如,当应用程序已经 运行ning 时,有新的主题名称出现,我想将这个主题添加到我的拓扑中。
目前我正在考虑以某种方式删除现有拓扑,关闭并清理 KafkaStreams,运行 我创建拓扑但使用新主题名称并再次启动 KafkaStreams 的逻辑。我不想重新启动我的应用程序。
有人可以建议我如何在 运行 时间内做到这一点吗?
我找到了 1 个解决方案。
我扩展了 StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
当我构建拓扑时,我没有使用 StreamsBuilder,而是使用 StreamsBuilderFactoryBean#getObject():
@Component
public class 动态流 {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}
我有一个基于 spring 引导、spring-kafka 和 kafka-streams 的应用程序。 当应用程序启动时,它会创建带有默认主题列表的 kafka 流拓扑。 我需要做的是在 运行 时间内完成 edit/recreate 拓扑。例如,当应用程序已经 运行ning 时,有新的主题名称出现,我想将这个主题添加到我的拓扑中。 目前我正在考虑以某种方式删除现有拓扑,关闭并清理 KafkaStreams,运行 我创建拓扑但使用新主题名称并再次启动 KafkaStreams 的逻辑。我不想重新启动我的应用程序。 有人可以建议我如何在 运行 时间内做到这一点吗?
我找到了 1 个解决方案。 我扩展了 StreamsBuilderFactoryBean:
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}
public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {
private StreamsBuilder instance;
public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
super(streamsConfig);
}
@Override
public boolean isSingleton() {
return false;
}
@Override
protected synchronized StreamsBuilder createInstance() {
if (instance == null) {
instance = new StreamsBuilder();
}
return instance;
}
@Override
public synchronized void stop() {
instance = null;
super.stop();
}
}
当我构建拓扑时,我没有使用 StreamsBuilder,而是使用 StreamsBuilderFactoryBean#getObject():
@Component
public class 动态流 {
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
public void init() {
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
//build topology
}
//call this method when stream reconfiguration is needed
public void reinitialize() {
streamsBuilderFactoryBean.stop();
init();
streamsBuilderFactoryBean.start();
}
}