Spring Kafka - 动态创建流
Spring Kafka - create streams dynamically
我需要从配置文件动态创建 kafka 流,其中包含每个流的源主题名称和配置。应用程序需要有数十个 kafka 流,并且每个环境(例如,stage、prod)上的流都会不同。
spring-kafka
库可以做到这一点吗?
我们可以用 kafka-streams
轻松做到这一点:
@Bean
public List<KafkaStreams> kafkaStreams() {
return streamRouteProperties.stream()
.map(routeProperty -> createKafkaStream(routeProperty))
.collect(toList());
}
private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
return new KafkaStreams(topology, streamsConfig);
}
我们需要实现 spring SmartLifecycle
接口,所以所有流都会自动启动和关闭。
是否可以使用 spring-kafka
做同样的事情?
如我所见,我们需要在代码中创建每个 kafka 流,而且我看不到如何使用 StreamsBuilderFactoryBean
创建 kafka 流列表的可能性。
对于每个必需的流,我需要执行以下操作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
Consumed<String, String> consumed = ..;
KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
kStream.process(() -> eventProcessor);
return kStream;
}
@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig);
}
但是如何使用 StreamsBuilderFactoryBean
动态创建 kafka 流列表?
StreamsBuilderFactoryBean
只是为 Spring 应用程序上下文带来了一些自以为是的、方便的 API,但这并不意味着您总是应该与它联系在一起。
幸运的是,StreamsBuilderFactoryBean
与常规 Kafka Streams 相比并没有太多价值。它的最大作用是对内部创建的生命周期控制 KafkaStreams
。
您可以随意使用原始的 Kafka Streams API,不要根据真正设计的 StreamsBuilderFactoryBean
使其成为您的要求而使您的代码过于复杂对于一组静态选项。
我需要从配置文件动态创建 kafka 流,其中包含每个流的源主题名称和配置。应用程序需要有数十个 kafka 流,并且每个环境(例如,stage、prod)上的流都会不同。
spring-kafka
库可以做到这一点吗?
我们可以用 kafka-streams
轻松做到这一点:
@Bean
public List<KafkaStreams> kafkaStreams() {
return streamRouteProperties.stream()
.map(routeProperty -> createKafkaStream(routeProperty))
.collect(toList());
}
private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
return new KafkaStreams(topology, streamsConfig);
}
我们需要实现 spring SmartLifecycle
接口,所以所有流都会自动启动和关闭。
是否可以使用 spring-kafka
做同样的事情?
如我所见,我们需要在代码中创建每个 kafka 流,而且我看不到如何使用 StreamsBuilderFactoryBean
创建 kafka 流列表的可能性。
对于每个必需的流,我需要执行以下操作:
@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
Consumed<String, String> consumed = ..;
KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
kStream.process(() -> eventProcessor);
return kStream;
}
@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig);
}
但是如何使用 StreamsBuilderFactoryBean
动态创建 kafka 流列表?
StreamsBuilderFactoryBean
只是为 Spring 应用程序上下文带来了一些自以为是的、方便的 API,但这并不意味着您总是应该与它联系在一起。
幸运的是,StreamsBuilderFactoryBean
与常规 Kafka Streams 相比并没有太多价值。它的最大作用是对内部创建的生命周期控制 KafkaStreams
。
您可以随意使用原始的 Kafka Streams API,不要根据真正设计的 StreamsBuilderFactoryBean
使其成为您的要求而使您的代码过于复杂对于一组静态选项。