无法使用 Spring Cloud Stream Binder Kafka 3.x 将自定义存储连接到 Transformer

Cannot get custom store connected to a Transformer with Spring Cloud Stream Binder Kafka 3.x

无法在 Spring Cloud Stream Binder Kafka 3.x 中将自定义存储连接到我的 Transformer 3.x(功能样式)遵循 here.

中的示例

我将 KeyValueStore 定义为类型为 StoreBuilder<KeyValueStore<String,Long>>:

的 @Bean
@Bean
public StoreBuilder<KeyValueStore<String,Long>> myStore() {
    return Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"), Serdes.String(),
        Serdes.Long());
}

@Bean
@DependsOn({"myStore"})
public MyTransformer myTransformer() {
    return new MyTransformer("my-store");
}

在调试器中,我可以看到 bean 已初始化。 然后在我的流处理器函数中:

    return myStream ->  {
        return myStream
            .peek(..)
            .transform(() -> myTransformer())
            ...

MyTransformer 声明为

public class MyTransformer implements Transformer<String, MyEvent, KeyValue<KeyValue<String,Long>, MyEvent>> {
...
@Override
public void init(final ProcessorContext context) {
    this.context = context;
    this.myStore = context.getStateStore(storeName);
}

当应用程序上下文从我的单元测试启动时出现以下错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-TRANSFORM-0000000002 has no access to StateStore my-store as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.

在 运行 我的单元测试时的应用程序启动日志中,我可以看到商店似乎已创建:

2021-04-06 00:44:43.806  INFO [           main] .k.s.AbstractKafkaStreamsBinderProcessor : state store my-store added to topology

我已经在我的应用程序和单元测试中使用了 Spring Cloud Stream Binder Kafka 的几乎所有功能,一切都运行良好。出乎意料的是,我在将自定义 KeyValueStore 添加到我的 Transformer 时卡住了。如果您能发现我的设置中的错误,那就太好了。

我现在使用的版本:

org.springframework.boot:spring-boot:jar:2.4.4
org.springframework.kafka:spring-kafka:jar:2.6.7
org.springframework.kafka:spring-kafka-test:jar:2.6.7
org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:jar:3.0.4.RELEASE
org.apache.kafka:kafka-streams:jar:2.7.0

我刚试过

org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:jar:3.1.3-SNAPSHOT

问题似乎仍然存在。

在您的处理器函数中,当您调用 .transform(() -> myTransformer()) 时,您还需要提供状态存储名称,以便将其连接到该转换器。 KStream API 中有一些重载的 transform 方法将状态存储名称作为可变参数。我想知道这是否是您 运行 关注的问题。您可能希望将该调用更改为 .transform(() -> myTransformer(), "myStore").