如何使用 Spring Cloud Stream 和 Kafka 绑定 Store?
How to bind a Store using Spring Cloud Stream and Kafka?
我想在使用 Spring Cloud Stream 的 Kafka Binder 的示例应用程序中使用 KeyValueStore 类型的 Kafka 状态存储。
按照documentation,应该很简单。
这是我的主要 class:
@SpringBootApplication
public class KafkaStreamTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
@Bean
public BiFunction<KStream<String, String>, KeyValueStore<String,String>, KStream<String, String>> process(){
return (input,store) -> input.mapValues(v -> v.toUpperCase());
}
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.String(),
Serdes.String());
}
}
我想应该将 KeyValueStore 作为 "process" 方法的第二个参数传递,但应用程序无法启动并显示以下消息:
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.state.KeyValueStore among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:103) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1855) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1792) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
我在 Spring Cloud Stream 中找到了有关如何使用商店读取 unit test 的解决方案。
下面的代码是我如何将该解决方案应用到我的代码中的。
转换器使用 Spring bean 方法“myStore”
提供的 Store
@SpringBootApplication
public class KafkaStreamTestApplication {
public static final String MY_STORE_NAME = "my-store";
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process2(){
return (input) -> input.
transformValues(() -> new MyValueTransformer(), MY_STORE_NAME);
}
@Bean
public StoreBuilder<?> myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(MY_STORE_NAME), Serdes.String(),
Serdes.String());
}
}
public class MyValueTransformer implements ValueTransformer<String, String> {
private KeyValueStore<String,String> store;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
store = (KeyValueStore<String, String>) this.context.getStateStore(KafkaStreamTestApplication.MY_STORE_NAME);
}
@Override
public String transform(String value) {
String tValue = store.get(value);
if(tValue==null) {
store.put(value, value.toUpperCase());
}
return tValue;
}
@Override
public void close() {
if(store!=null) {
store.close();
}
}
}
我想在使用 Spring Cloud Stream 的 Kafka Binder 的示例应用程序中使用 KeyValueStore 类型的 Kafka 状态存储。 按照documentation,应该很简单。 这是我的主要 class:
@SpringBootApplication
public class KafkaStreamTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
@Bean
public BiFunction<KStream<String, String>, KeyValueStore<String,String>, KStream<String, String>> process(){
return (input,store) -> input.mapValues(v -> v.toUpperCase());
}
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.String(),
Serdes.String());
}
}
我想应该将 KeyValueStore 作为 "process" 方法的第二个参数传递,但应用程序无法启动并显示以下消息:
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.state.KeyValueStore among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:103) ~[spring-cloud-stream-binder-kafka-streams-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1855) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1792) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
我在 Spring Cloud Stream 中找到了有关如何使用商店读取 unit test 的解决方案。
下面的代码是我如何将该解决方案应用到我的代码中的。 转换器使用 Spring bean 方法“myStore”
提供的 Store@SpringBootApplication
public class KafkaStreamTestApplication {
public static final String MY_STORE_NAME = "my-store";
public static void main(String[] args) {
SpringApplication.run(KafkaStreamTestApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process2(){
return (input) -> input.
transformValues(() -> new MyValueTransformer(), MY_STORE_NAME);
}
@Bean
public StoreBuilder<?> myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(MY_STORE_NAME), Serdes.String(),
Serdes.String());
}
}
public class MyValueTransformer implements ValueTransformer<String, String> {
private KeyValueStore<String,String> store;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
store = (KeyValueStore<String, String>) this.context.getStateStore(KafkaStreamTestApplication.MY_STORE_NAME);
}
@Override
public String transform(String value) {
String tValue = store.get(value);
if(tValue==null) {
store.put(value, value.toUpperCase());
}
return tValue;
}
@Override
public void close() {
if(store!=null) {
store.close();
}
}
}