使用 spring-cloud-stream-binder-kafka 将 GlobalStateStore 绑定到处理器中

Binding GlobalStateStore into Processor with spring-cloud-stream-binder-kafka

初始问题: 我有一个问题,如何将我的 GlobalStateStore 绑定到处理器。我的应用程序有一个 GlobalStateStore 和一个自己的处理器(“GlobalConfigProcessor”)来保持商店最新。此外,我还有另一个处理器(“MyClassProcessor”),它在我的消费者函数中被调用。现在,我尝试从 MyClassProcessor 访问商店,但出现异常:无效拓扑:StateStore config_statestore 尚未添加。

最新情况: 我设置了一个测试存储库以更好地了解我的情况。这可以在这里找到:https://github.com/fx42/store-example

正如您在 repo 中看到的,我有两个消费者,它们都使用不同的主题。 Config-Topic 提供了一个我想写入 GlobalStateStore 的事件。这是涉及的 StateStoreUpdateConsumer.java 和 StateStoreProcessor.java。 使用 MyClassEventConsumer.java 我处理另一个输入主题并希望从 GlobalStateStore 中读取值。 正如此 doc 中所提供的,我无法像初始化 StateStoreBean 一样初始化 GlobalStateStores,而是必须使用 StreamsBuilderFactoryBeanCustomizer Bean 主动添加它。此代码目前在 StreamConfig.java 中被注释掉了。没有这段代码我得到异常

org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.

如果代码正在使用中,我会得到异常:

org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory

所以这导致我的决定,我有一个配置问题,所以拓扑被搞砸了。

问题:

  1. 当我直接通过
  2. 为 GlobalStateStore 提供处理器时
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
                            Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));

我是否必须为此处理器提供消费者函数,或者我什至必须在函数 configuration/application.yml 中提及它?

  1. 有没有办法不在 addGlobalStore 调用中提供 ProcessorSupplier 而只使用功能方式?

  2. 如果两个定义的函数有两种不同的拓扑,我该如何处理这个 GlobalStateStore?

这是我用来将 GlobalStateStore 添加到 FactoryBean 的注释掉的 StreamBuilderFactoryCustomizer Bean:

@Bean
StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(
            StoreBuilder<KeyValueStore<String, String>> storeBuilder) {

        return factoryBean -> {
            try {
                var streamBuilder = factoryBean.getObject();
                streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
                        Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));

            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    } };
}

我想通了我的问题。对我来说,这是我使用的@EnableKafkaStreams 注释。我认为这就是我同时拥有两个不同上下文 运行 并且它们发生冲突的原因。此外,我需要使用 StreamsBuilderFactoryBeanConfigurer 而不是 StreamsBuilderFactoryBeanCustomizer 来正确注册 GlobalStateStore。 这些更改在链接 test-repo 中完成,现在可以正确启动应用程序上下文。