Spring kafka 在运行时重新创建 Kafka Stream Topology

Spring kafka re-create Kafka Stream Topology in runtime

我有一个基于 spring 引导、spring-kafka 和 kafka-streams 的应用程序。 当应用程序启动时,它会创建带有默认主题列表的 kafka 流拓扑。 我需要做的是在 运行 时间内完成 edit/recreate 拓扑。例如,当应用程序已经 运行ning 时,有新的主题名称出现,我想将这个主题添加到我的拓扑中。 目前我正在考虑以某种方式删除现有拓扑,关闭并清理 KafkaStreams,运行 我创建拓扑但使用新主题名称并再次启动 KafkaStreams 的逻辑。我不想重新启动我的应用程序。 有人可以建议我如何在 运行 时间内做到这一点吗?

我找到了 1 个解决方案。 我扩展了 StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
    return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}

public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

    private StreamsBuilder instance;

    public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
        super(streamsConfig);
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    protected synchronized StreamsBuilder createInstance() {
        if (instance == null) {
            instance = new StreamsBuilder();
        }
        return instance;
    }

    @Override
    public synchronized void stop() {
        instance = null;
        super.stop();
    }
}

当我构建拓扑时,我没有使用 StreamsBuilder,而是使用 StreamsBuilderFactoryBean#getObject():

@Component

public class 动态流 {

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

public void init() {
    StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
        //build topology
}

//call this method when stream reconfiguration is needed
public void reinitialize() {
    streamsBuilderFactoryBean.stop();
    init();
    streamsBuilderFactoryBean.start();
}

}