使用 spring-kafka 从 KTable 读取 ReadOnlyKeyValueStore
ReadOnlyKeyValueStore from a KTable using spring-kafka
我正在迁移一个使用纯 Kafka api 的 Kafka Streams 实现来使用 spring-kafka,因为它被合并到 spring-boot 应用程序中。
一切正常,我拥有的 Stream、GlobalKTable、分支都工作得很好,但我很难合并 ReadOnlyKeyValueStore。基于此处的 spring-kafka 文档:https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#streams-spring
它说:
If you need to perform some KafkaStreams operations directly, you can
access that internal KafkaStreams instance by using
StreamsBuilderFactoryBean.getKafkaStreams(). You can autowire
StreamsBuilderFactoryBean bean by type, but you should be sure to use
the full type in the bean definition.
基于此,我尝试将其合并到我的示例中,如下面的片段所示:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = defaultStreamsConfigs();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "quote-stream");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-quotes-stream-group");
return new KafkaStreamsConfiguration(props);
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration defaultKafkaStreamsConfig) {
return new StreamsBuilderFactoryBean(defaultKafkaStreamsConfig);
}
...
final GlobalKTable<String, LeveragePrice> leverageBySymbolGKTable = streamsBuilder
.globalTable(KafkaConfiguration.LEVERAGE_PRICE_TOPIC,
Materialized.<String, LeveragePrice, KeyValueStore<Bytes, byte[]>>as("leverage-by-symbol-table")
.withKeySerde(Serdes.String())
.withValueSerde(leveragePriceSerde));
leveragePriceView = myKStreamsBuilder.getKafkaStreams().store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
但是添加 StreamsBuilderFactoryBean(似乎需要它来获取对 KafkaStreams 的引用)定义会导致错误:
The bean 'defaultKafkaStreamsBuilder', defined in class path resource [com/resona/springkafkastream/repository/KafkaConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] and overriding is disabled.
问题是我不想控制流的生命周期,这是我通过普通 Kafka API 获得的生命周期,所以我想根据需要获得对默认托管流的引用 spring管理它,但每当我尝试公开 bean 时,它都会出错。关于使用 spring-kafka 的正确方法是什么的任何想法?
P.S - 我对使用 spring-cloud-stream 的解决方案不感兴趣 我正在寻找 spring-kafka 的实现。
你不需要定义任何新的bean;像这样的东西应该有用...
spring.application.name=quote-stream
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
@SpringBootApplication
@EnableKafkaStreams
public class So69669791Application {
public static void main(String[] args) {
SpringApplication.run(So69669791Application.class, args);
}
@Bean
GlobalKTable<String, String> leverageBySymbolGKTable(StreamsBuilder sb) {
return sb.globalTable("gkTopic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("leverage-by-symbol-table"));
}
private ReadOnlyKeyValueStore<String, String> leveragePriceView;
@Bean
StreamsBuilderFactoryBean.Listener afterStart(StreamsBuilderFactoryBean sbfb,
GlobalKTable<String, String> leverageBySymbolGKTable) {
StreamsBuilderFactoryBean.Listener listener = new StreamsBuilderFactoryBean.Listener() {
@Override
public void streamsAdded(String id, KafkaStreams streams) {
leveragePriceView = streams.store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
}
};
sbfb.addListener(listener);
return listener;
}
@Bean
KStream<String, String> stream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("someTopic");
stream.to("otherTopic");
return stream;
}
}
我正在迁移一个使用纯 Kafka api 的 Kafka Streams 实现来使用 spring-kafka,因为它被合并到 spring-boot 应用程序中。
一切正常,我拥有的 Stream、GlobalKTable、分支都工作得很好,但我很难合并 ReadOnlyKeyValueStore。基于此处的 spring-kafka 文档:https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#streams-spring
它说:
If you need to perform some KafkaStreams operations directly, you can access that internal KafkaStreams instance by using StreamsBuilderFactoryBean.getKafkaStreams(). You can autowire StreamsBuilderFactoryBean bean by type, but you should be sure to use the full type in the bean definition.
基于此,我尝试将其合并到我的示例中,如下面的片段所示:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = defaultStreamsConfigs();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "quote-stream");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-quotes-stream-group");
return new KafkaStreamsConfiguration(props);
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration defaultKafkaStreamsConfig) {
return new StreamsBuilderFactoryBean(defaultKafkaStreamsConfig);
}
...
final GlobalKTable<String, LeveragePrice> leverageBySymbolGKTable = streamsBuilder
.globalTable(KafkaConfiguration.LEVERAGE_PRICE_TOPIC,
Materialized.<String, LeveragePrice, KeyValueStore<Bytes, byte[]>>as("leverage-by-symbol-table")
.withKeySerde(Serdes.String())
.withValueSerde(leveragePriceSerde));
leveragePriceView = myKStreamsBuilder.getKafkaStreams().store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
但是添加 StreamsBuilderFactoryBean(似乎需要它来获取对 KafkaStreams 的引用)定义会导致错误:
The bean 'defaultKafkaStreamsBuilder', defined in class path resource [com/resona/springkafkastream/repository/KafkaConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] and overriding is disabled.
问题是我不想控制流的生命周期,这是我通过普通 Kafka API 获得的生命周期,所以我想根据需要获得对默认托管流的引用 spring管理它,但每当我尝试公开 bean 时,它都会出错。关于使用 spring-kafka 的正确方法是什么的任何想法?
P.S - 我对使用 spring-cloud-stream 的解决方案不感兴趣 我正在寻找 spring-kafka 的实现。
你不需要定义任何新的bean;像这样的东西应该有用...
spring.application.name=quote-stream
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
@SpringBootApplication
@EnableKafkaStreams
public class So69669791Application {
public static void main(String[] args) {
SpringApplication.run(So69669791Application.class, args);
}
@Bean
GlobalKTable<String, String> leverageBySymbolGKTable(StreamsBuilder sb) {
return sb.globalTable("gkTopic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("leverage-by-symbol-table"));
}
private ReadOnlyKeyValueStore<String, String> leveragePriceView;
@Bean
StreamsBuilderFactoryBean.Listener afterStart(StreamsBuilderFactoryBean sbfb,
GlobalKTable<String, String> leverageBySymbolGKTable) {
StreamsBuilderFactoryBean.Listener listener = new StreamsBuilderFactoryBean.Listener() {
@Override
public void streamsAdded(String id, KafkaStreams streams) {
leveragePriceView = streams.store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
}
};
sbfb.addListener(listener);
return listener;
}
@Bean
KStream<String, String> stream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("someTopic");
stream.to("otherTopic");
return stream;
}
}