Spring 具有不同 Key/Value 的云流中的多个输出绑定(AKA 分支)

Multiple Output Bindings (AKA Branching) In Spring Cloud Stream With Different Key/Value

我知道 Kafka Streams 允许根据指定的谓词将数据分发到多个主题,并且 Kafka Streams 绑定器使用 @StreamListenerfunctional binding 方法支持这一点。

...
// return type KStream<?, WordCount>[]

Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

return input.
    ... 
    branch(isEnglish, isFrench, isSpanish);

我想知道如何在返回数据之前转换其中一个分支的键或值。假设我希望其中一个分支具有与其他分支不同的密钥类型。

Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo =  (k, v) -> v.important.equals("2");

KStream<Key, Value>[] branches = input.branch(isOne, isTwo);

KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);

我想用两个流创建一个新的 KStream<?, Value>[] 数组,但由于通用数组创建错误而无法实现。

我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的 producer.

指定不同的 key/value serdes
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

感谢您的帮助。

一个选择是创建一个副主题。然后,每条与您发送到该副主题的 WordCount 不同的记录。 WordCount你留着的那个题目的记录还在分支

我根据 Spring cloud streams with KStream 的样本创建了这个代码示例。它不起作用,因为这个想法有效。我有一个使用不同 Order 对象并将错误的 Orders 发送到错误主题的模拟示例。

return input -> {
            KStream<?, WordCount> intermediateStream = input
                    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
                    .groupBy((key, value) -> value)
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
                    .count(Materialized.as("WordCounts-1"))
                    .toStream()
                    .map((key, value) -> new KeyValue<>(null,
                            new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));

            // here we return to the SIDE_TOPIC records with JsonSerde
            intermediateStream
                    .filter((k, v) -> `create another filter`)
                    .map((k, v) -> `transform only this stream`)
                    .to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));

            // here we keep using the branch serializer.
            intermediateStream.branch(isEnglish, isFrench, isSpanish);
        }

此用例是一种常见的方法,当您使用带有错误或空值的数据时,您希望将这些数据发送到副主题,即:错误主题。然后你仍然可以保存这些事件以备将来分析。