在 spring 云中使用 StreamBridge 或 kafkaStreams API 方法

Use StreamBridge or kafkaStreams API method in spring cloud

我正在使用 spring-cloud-stream 项目来使用 Kafka Streams,我是新手。由于推荐使用函数式编程,我定义了一个函数如下:

@Configuration
public class StreamConfiguration {

    @Bean
    public Consumer<KStream<String, PaymentRequest>> orderProcess() {
         return stream ->
                 stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                .branch(Named.as("RejectionCheck"), rejectionCheckPredicate, (key, value) -> true)
                ...
    }
}

关键是因为我在流中有多个分支并且每个分支路径都有唯一的目的地,所以无法使用 Function 来使用 orderProcess-out-0: destination: name。我注意到我可以像打击一样使用 Kafka Streams to(destination):

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .to("destination");

或像这样使用 StreamBridge

stream.mapValues(requestBuilderValueMapper, Named.as("ExtendedRequestBuilder"))
                .peek(paymentRequestDateValidatorAction, Named.as("DateValidator"))
                ...
                .peek((k,v) -> streamBridge("bindName-out-0", v));

哪种方法是处理这种情况的正确方法?

这两种方法是保留 Kafka Streams exactly_once_beta 事务模式还是破坏它?

我注意到的不同之处在于,第一种方法导致从 Kafka 代理 server.properties 推断出目标分区数,但后者使 spring 使用 application.yml 创建主题分区配置。

是的,对于第二个,Spring 从绑定属性中提供主题。

exactly once beta被第二个支持,只要broker是2.5或者更高,并且spring-kafka是2.6.x(或者2.5.x with容器的 EOSMode 设置为 BETA

它们在功能上是相同的。