Spring 具有不同 Key/Value 的云流中的多个输出绑定(AKA 分支)
Multiple Output Bindings (AKA Branching) In Spring Cloud Stream With Different Key/Value
我知道 Kafka Streams 允许根据指定的谓词将数据分发到多个主题,并且 Kafka Streams 绑定器使用 @StreamListener
和 functional binding
方法支持这一点。
...
// return type KStream<?, WordCount>[]
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input.
...
branch(isEnglish, isFrench, isSpanish);
我想知道如何在返回数据之前转换其中一个分支的键或值。假设我希望其中一个分支具有与其他分支不同的密钥类型。
Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo = (k, v) -> v.important.equals("2");
KStream<Key, Value>[] branches = input.branch(isOne, isTwo);
KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);
我想用两个流创建一个新的 KStream<?, Value>[]
数组,但由于通用数组创建错误而无法实现。
我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的 producer
.
指定不同的 key/value serdes
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
感谢您的帮助。
一个选择是创建一个副主题。然后,每条与您发送到该副主题的 WordCount
不同的记录。 WordCount
你留着的那个题目的记录还在分支
我根据 Spring cloud streams with KStream 的样本创建了这个代码示例。它不起作用,因为这个想法有效。我有一个使用不同 Order
对象并将错误的 Orders
发送到错误主题的模拟示例。
return input -> {
KStream<?, WordCount> intermediateStream = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
// here we return to the SIDE_TOPIC records with JsonSerde
intermediateStream
.filter((k, v) -> `create another filter`)
.map((k, v) -> `transform only this stream`)
.to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));
// here we keep using the branch serializer.
intermediateStream.branch(isEnglish, isFrench, isSpanish);
}
此用例是一种常见的方法,当您使用带有错误或空值的数据时,您希望将这些数据发送到副主题,即:错误主题。然后你仍然可以保存这些事件以备将来分析。
我知道 Kafka Streams 允许根据指定的谓词将数据分发到多个主题,并且 Kafka Streams 绑定器使用 @StreamListener
和 functional binding
方法支持这一点。
...
// return type KStream<?, WordCount>[]
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input.
...
branch(isEnglish, isFrench, isSpanish);
我想知道如何在返回数据之前转换其中一个分支的键或值。假设我希望其中一个分支具有与其他分支不同的密钥类型。
Predicate<Key, Value> isOne = (k, v) -> v.important.equals("1");
Predicate<Key, Value> isTwo = (k, v) -> v.important.equals("2");
KStream<Key, Value>[] branches = input.branch(isOne, isTwo);
KStream<String, Value> one = branches[0].selectKey((k, v) -> v.importantValue);
我想用两个流创建一个新的 KStream<?, Value>[]
数组,但由于通用数组创建错误而无法实现。
我知道这是可能的,从下面的文档摘录中可以看出,可以为每个分支的 producer
.
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
感谢您的帮助。
一个选择是创建一个副主题。然后,每条与您发送到该副主题的 WordCount
不同的记录。 WordCount
你留着的那个题目的记录还在分支
我根据 Spring cloud streams with KStream 的样本创建了这个代码示例。它不起作用,因为这个想法有效。我有一个使用不同 Order
对象并将错误的 Orders
发送到错误主题的模拟示例。
return input -> {
KStream<?, WordCount> intermediateStream = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(6)))
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null,
new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
// here we return to the SIDE_TOPIC records with JsonSerde
intermediateStream
.filter((k, v) -> `create another filter`)
.map((k, v) -> `transform only this stream`)
.to(SIDE_TOPIC, Produced.with(CustomSerdes.String(), new JsonSerde(....)));
// here we keep using the branch serializer.
intermediateStream.branch(isEnglish, isFrench, isSpanish);
}
此用例是一种常见的方法,当您使用带有错误或空值的数据时,您希望将这些数据发送到副主题,即:错误主题。然后你仍然可以保存这些事件以备将来分析。