使用 spring-cloud-stream-binder-kafka 将 GlobalStateStore 绑定到处理器中
Binding GlobalStateStore into Processor with spring-cloud-stream-binder-kafka
初始问题:
我有一个问题,如何将我的 GlobalStateStore 绑定到处理器。我的应用程序有一个 GlobalStateStore 和一个自己的处理器(“GlobalConfigProcessor”)来保持商店最新。此外,我还有另一个处理器(“MyClassProcessor”),它在我的消费者函数中被调用。现在,我尝试从 MyClassProcessor 访问商店,但出现异常:无效拓扑:StateStore config_statestore 尚未添加。
最新情况:
我设置了一个测试存储库以更好地了解我的情况。这可以在这里找到:https://github.com/fx42/store-example
正如您在 repo 中看到的,我有两个消费者,它们都使用不同的主题。 Config-Topic 提供了一个我想写入 GlobalStateStore 的事件。这是涉及的 StateStoreUpdateConsumer.java 和 StateStoreProcessor.java。
使用 MyClassEventConsumer.java 我处理另一个输入主题并希望从 GlobalStateStore 中读取值。
正如此 doc 中所提供的,我无法像初始化 StateStoreBean 一样初始化 GlobalStateStores,而是必须使用 StreamsBuilderFactoryBeanCustomizer Bean 主动添加它。此代码目前在 StreamConfig.java 中被注释掉了。没有这段代码我得到异常
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
如果代码正在使用中,我会得到异常:
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
所以这导致我的决定,我有一个配置问题,所以拓扑被搞砸了。
问题:
- 当我直接通过
为 GlobalStateStore 提供处理器时
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
我是否必须为此处理器提供消费者函数,或者我什至必须在函数 configuration/application.yml 中提及它?
有没有办法不在 addGlobalStore
调用中提供 ProcessorSupplier 而只使用功能方式?
如果两个定义的函数有两种不同的拓扑,我该如何处理这个 GlobalStateStore?
这是我用来将 GlobalStateStore 添加到 FactoryBean 的注释掉的 StreamBuilderFactoryCustomizer Bean:
@Bean
StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(
StoreBuilder<KeyValueStore<String, String>> storeBuilder) {
return factoryBean -> {
try {
var streamBuilder = factoryBean.getObject();
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
} catch (Exception e) {
e.printStackTrace();
}
};
} };
}
我想通了我的问题。对我来说,这是我使用的@EnableKafkaStreams 注释。我认为这就是我同时拥有两个不同上下文 运行 并且它们发生冲突的原因。此外,我需要使用 StreamsBuilderFactoryBeanConfigurer
而不是 StreamsBuilderFactoryBeanCustomizer
来正确注册 GlobalStateStore。
这些更改在链接 test-repo 中完成,现在可以正确启动应用程序上下文。
初始问题: 我有一个问题,如何将我的 GlobalStateStore 绑定到处理器。我的应用程序有一个 GlobalStateStore 和一个自己的处理器(“GlobalConfigProcessor”)来保持商店最新。此外,我还有另一个处理器(“MyClassProcessor”),它在我的消费者函数中被调用。现在,我尝试从 MyClassProcessor 访问商店,但出现异常:无效拓扑:StateStore config_statestore 尚未添加。
最新情况: 我设置了一个测试存储库以更好地了解我的情况。这可以在这里找到:https://github.com/fx42/store-example
正如您在 repo 中看到的,我有两个消费者,它们都使用不同的主题。 Config-Topic 提供了一个我想写入 GlobalStateStore 的事件。这是涉及的 StateStoreUpdateConsumer.java 和 StateStoreProcessor.java。 使用 MyClassEventConsumer.java 我处理另一个输入主题并希望从 GlobalStateStore 中读取值。 正如此 doc 中所提供的,我无法像初始化 StateStoreBean 一样初始化 GlobalStateStores,而是必须使用 StreamsBuilderFactoryBeanCustomizer Bean 主动添加它。此代码目前在 StreamConfig.java 中被注释掉了。没有这段代码我得到异常
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
如果代码正在使用中,我会得到异常:
org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
所以这导致我的决定,我有一个配置问题,所以拓扑被搞砸了。
问题:
- 当我直接通过 为 GlobalStateStore 提供处理器时
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
我是否必须为此处理器提供消费者函数,或者我什至必须在函数 configuration/application.yml 中提及它?
有没有办法不在
addGlobalStore
调用中提供 ProcessorSupplier 而只使用功能方式?如果两个定义的函数有两种不同的拓扑,我该如何处理这个 GlobalStateStore?
这是我用来将 GlobalStateStore 添加到 FactoryBean 的注释掉的 StreamBuilderFactoryCustomizer Bean:
@Bean
StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(
StoreBuilder<KeyValueStore<String, String>> storeBuilder) {
return factoryBean -> {
try {
var streamBuilder = factoryBean.getObject();
streamBuilder.addGlobalStore(storeBuilder, configInputTopic,
Consumed.with(Serdes.String(), Serdes.String()), () -> new StateStoreProcessor(statestoreName));
} catch (Exception e) {
e.printStackTrace();
}
};
} };
}
我想通了我的问题。对我来说,这是我使用的@EnableKafkaStreams 注释。我认为这就是我同时拥有两个不同上下文 运行 并且它们发生冲突的原因。此外,我需要使用 StreamsBuilderFactoryBeanConfigurer
而不是 StreamsBuilderFactoryBeanCustomizer
来正确注册 GlobalStateStore。
这些更改在链接 test-repo 中完成,现在可以正确启动应用程序上下文。