如何在 Spring Cloud Kafka Streams 中以函数式的方式做这个拓扑?
How to do this topology in Spring Cloud Kafka Streams in function style?
var streamsBuilder = new StreamsBuilder();
KStream<String, String> inputStream = streamsBuilder.stream("input_topic");
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
upperCaseString.mapValues(v -> v + "postfix").to("with_postfix_topic");
upperCaseString.mapValues(v -> "prefix" + v).to("with_prefix_topic");
Topology topology = streamsBuilder.build();
我可以写三个函数bean。第一个 bean 将大写并将结果写入某个主题 ('upper_case_topic')。其他 bean 将使用此结果(来自 'upper_case_topic')并添加 prefix/postfix。但是不写中间话题怎么办呢('upper_case_topic')?
更新:
这是我可能的解决方案:
@Bean
public Consumer<KStream<String, String>> process() {
return input -> {
KStream<String, String> upperCaseStream =
input.mapValues((ValueMapper<String, String>) String::toUpperCase);
upperCaseStream.mapValues(v -> v + " 111").to("new_topic_1");
upperCaseStream.mapValues(v -> v + " 222").to("new_topic_2");
};
}
您可以尝试以下选项。
选项 1 - 使用来自 Spring Cloud Stream
的 StreamBridge
@Bean
public Consumer<KStream<String, String>> process() {
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>)
String::toUpperCase);
upperCaseString.foreach((key, value) -> {
streamBridge.send("with_postfix_topic", value + "postfix");
streamBridge.send("with_prefix_topic", "prefix" + value);
});
}
上述方法的一个方面是您需要来自 Spring Cloud Stream 的 Kafka 和 Kafka Streams 绑定器才能使其工作。另一个问题是,当您直接发送到业务逻辑中的 Kafka 主题时,您会失去 Kafka Streams 原生提供的端到端语义。根据您的用例,这种方法可能没问题。
选项 2 - 在出站上使用 KStream[]
您通常在具有 Kafka Streams API 分支功能的出站上使用 KStream[]
,但您可以利用 Spring Cloud Stream 在其之上构建的输出绑定功能分支功能作为您的用例的解决方法。这是一个您可以尝试的想法。
@Bean
public Function<KStream<String, String>, KStream<String, String>[]> process() {
return inputStream -> {
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>)
String::toUpperCase);
KStream<String, String>[] kStreams = new KStream[2];
kStreams[0] = upperCaseString.mapValues(v -> v + "postfix");
kStreams[1] = upperCaseString.mapValues(v -> v + "postfix");
return kStreams;
};
}
然后你可以定义你的目的地如下:
spring.cloud.stream.bindings.process-in-0.destination: <input-topic-name>
spring.cloud.stream.bindings.process-out-0.destination: <output-topic-name>
spring.cloud.stream.bindings.process-out-1.destination: <output-topic-name>
使用这种方法,您可以从 Kafka Streams 获得端到端的语义,因为发送到 Kafka 的主题是通过 Kafka Streams 处理的,方法是在幕后调用 KStream
上的 to
方法通过活页夹。
选项 3 - 使用函数组合
您的另一个选择是 Kafka Streams 活页夹中的函数组合。请记住,此功能尚未发布,但活页夹的最新 3.1.x/3.2.x 快照已提供此功能。这样,您可以定义三个简单的函数,如下所示。
@Bean
public Function<KStream<String, String>, KStream<String, String>> uppercase() {
return inputStream -> inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> postfixed() {
return inputStream -> inputStream.mapValues(v -> v + "postfix");
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> prefixed() {
return inputStream -> inputStream.mapValues(v -> "prefix" + v);
}
然后你可以有两个函数组合流程如下:
spring.cloud.function.definition: uppercase|postfixed;uppercase|prefixed
您可以在每个组合函数绑定上设置输入主题,如下所示。
spring.cloud.stream.bindings.uppercasepostfixed-in-0.destination=<input-topic>
spring.cloud.stream.bindings.uppercaseprefixed-in-0.destination=<input-topic>
通过这种函数组合方法,您可以从 Kafka Streams 获得端到端的语义,并避免额外的中间主题。这里的缺点是 uppercase
函数将为每个传入记录调用两次。
上述方法可行,但在将它们用于您的用例之前请考虑权衡取舍。
var streamsBuilder = new StreamsBuilder();
KStream<String, String> inputStream = streamsBuilder.stream("input_topic");
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
upperCaseString.mapValues(v -> v + "postfix").to("with_postfix_topic");
upperCaseString.mapValues(v -> "prefix" + v).to("with_prefix_topic");
Topology topology = streamsBuilder.build();
我可以写三个函数bean。第一个 bean 将大写并将结果写入某个主题 ('upper_case_topic')。其他 bean 将使用此结果(来自 'upper_case_topic')并添加 prefix/postfix。但是不写中间话题怎么办呢('upper_case_topic')?
更新: 这是我可能的解决方案:
@Bean
public Consumer<KStream<String, String>> process() {
return input -> {
KStream<String, String> upperCaseStream =
input.mapValues((ValueMapper<String, String>) String::toUpperCase);
upperCaseStream.mapValues(v -> v + " 111").to("new_topic_1");
upperCaseStream.mapValues(v -> v + " 222").to("new_topic_2");
};
}
您可以尝试以下选项。
选项 1 - 使用来自 Spring Cloud Stream
的StreamBridge
@Bean
public Consumer<KStream<String, String>> process() {
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>)
String::toUpperCase);
upperCaseString.foreach((key, value) -> {
streamBridge.send("with_postfix_topic", value + "postfix");
streamBridge.send("with_prefix_topic", "prefix" + value);
});
}
上述方法的一个方面是您需要来自 Spring Cloud Stream 的 Kafka 和 Kafka Streams 绑定器才能使其工作。另一个问题是,当您直接发送到业务逻辑中的 Kafka 主题时,您会失去 Kafka Streams 原生提供的端到端语义。根据您的用例,这种方法可能没问题。
选项 2 - 在出站上使用 KStream[]
您通常在具有 Kafka Streams API 分支功能的出站上使用 KStream[]
,但您可以利用 Spring Cloud Stream 在其之上构建的输出绑定功能分支功能作为您的用例的解决方法。这是一个您可以尝试的想法。
@Bean
public Function<KStream<String, String>, KStream<String, String>[]> process() {
return inputStream -> {
KStream<String, String> upperCaseString =
inputStream.mapValues((ValueMapper<String, String>)
String::toUpperCase);
KStream<String, String>[] kStreams = new KStream[2];
kStreams[0] = upperCaseString.mapValues(v -> v + "postfix");
kStreams[1] = upperCaseString.mapValues(v -> v + "postfix");
return kStreams;
};
}
然后你可以定义你的目的地如下:
spring.cloud.stream.bindings.process-in-0.destination: <input-topic-name>
spring.cloud.stream.bindings.process-out-0.destination: <output-topic-name>
spring.cloud.stream.bindings.process-out-1.destination: <output-topic-name>
使用这种方法,您可以从 Kafka Streams 获得端到端的语义,因为发送到 Kafka 的主题是通过 Kafka Streams 处理的,方法是在幕后调用 KStream
上的 to
方法通过活页夹。
选项 3 - 使用函数组合
您的另一个选择是 Kafka Streams 活页夹中的函数组合。请记住,此功能尚未发布,但活页夹的最新 3.1.x/3.2.x 快照已提供此功能。这样,您可以定义三个简单的函数,如下所示。
@Bean
public Function<KStream<String, String>, KStream<String, String>> uppercase() {
return inputStream -> inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> postfixed() {
return inputStream -> inputStream.mapValues(v -> v + "postfix");
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> prefixed() {
return inputStream -> inputStream.mapValues(v -> "prefix" + v);
}
然后你可以有两个函数组合流程如下:
spring.cloud.function.definition: uppercase|postfixed;uppercase|prefixed
您可以在每个组合函数绑定上设置输入主题,如下所示。
spring.cloud.stream.bindings.uppercasepostfixed-in-0.destination=<input-topic>
spring.cloud.stream.bindings.uppercaseprefixed-in-0.destination=<input-topic>
通过这种函数组合方法,您可以从 Kafka Streams 获得端到端的语义,并避免额外的中间主题。这里的缺点是 uppercase
函数将为每个传入记录调用两次。
上述方法可行,但在将它们用于您的用例之前请考虑权衡取舍。