如何在 Spring Cloud Kafka Streams 中以函数式的方式做这个拓扑?

How to do this topology in Spring Cloud Kafka Streams in function style?

var streamsBuilder = new StreamsBuilder();
    KStream<String, String> inputStream = streamsBuilder.stream("input_topic");

KStream<String, String> upperCaseString =
        inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);

upperCaseString.mapValues(v -> v + "postfix").to("with_postfix_topic");
upperCaseString.mapValues(v -> "prefix" + v).to("with_prefix_topic");

Topology topology = streamsBuilder.build();

我可以写三个函数bean。第一个 bean 将大写并将结果写入某个主题 ('upper_case_topic')。其他 bean 将使用此结果(来自 'upper_case_topic')并添加 prefix/postfix。但是不写中间话题怎么办呢('upper_case_topic')?

更新: 这是我可能的解决方案:

@Bean
public Consumer<KStream<String, String>> process() {
    return input -> {
        KStream<String, String> upperCaseStream =
                input.mapValues((ValueMapper<String, String>) String::toUpperCase);

        upperCaseStream.mapValues(v -> v + " 111").to("new_topic_1");

        upperCaseStream.mapValues(v -> v + " 222").to("new_topic_2");
    };
}

您可以尝试以下选项。

选项 1 - 使用来自 Spring Cloud Stream

StreamBridge
@Bean
public Consumer<KStream<String, String>> process() {

  KStream<String, String> upperCaseString =
        inputStream.mapValues((ValueMapper<String, String>) 
    String::toUpperCase);

  upperCaseString.foreach((key, value) -> {
                streamBridge.send("with_postfix_topic", value + "postfix");
                streamBridge.send("with_prefix_topic", "prefix" + value);
            });
}

上述方法的一个方面是您需要来自 Spring Cloud Stream 的 Kafka 和 Kafka Streams 绑定器才能使其工作。另一个问题是,当您直接发送到业务逻辑中的 Kafka 主题时,您会失去 Kafka Streams 原生提供的端到端语义。根据您的用例,这种方法可能没问题。

选项 2 - 在出站上使用 KStream[]

您通常在具有 Kafka Streams API 分支功能的出站上使用 KStream[],但您可以利用 Spring Cloud Stream 在其之上构建的输出绑定功能分支功能作为您的用例的解决方法。这是一个您可以尝试的想法。

@Bean
public Function<KStream<String, String>, KStream<String, String>[]> process() {
  return inputStream -> {
                KStream<String, String> upperCaseString =
                        inputStream.mapValues((ValueMapper<String, String>)
                                String::toUpperCase);
                KStream<String, String>[] kStreams = new KStream[2];
                kStreams[0] = upperCaseString.mapValues(v -> v + "postfix");
                kStreams[1] = upperCaseString.mapValues(v -> v + "postfix");
                return kStreams;
            };
}

然后你可以定义你的目的地如下:

spring.cloud.stream.bindings.process-in-0.destination: <input-topic-name>
spring.cloud.stream.bindings.process-out-0.destination: <output-topic-name>
spring.cloud.stream.bindings.process-out-1.destination: <output-topic-name>

使用这种方法,您可以从 Kafka Streams 获得端到端的语义,因为发送到 Kafka 的主题是通过 Kafka Streams 处理的,方法是在幕后调用 KStream 上的 to 方法通过活页夹。

选项 3 - 使用函数组合

您的另一个选择是 Kafka Streams 活页夹中的函数组合。请记住,此功能尚未发布,但活页夹的最新 3.1.x/3.2.x 快照已提供此功能。这样,您可以定义三个简单的函数,如下所示。

@Bean
public Function<KStream<String, String>, KStream<String, String>> uppercase() {
  return inputStream -> inputStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> postfixed() {
  return inputStream -> inputStream.mapValues(v -> v + "postfix");
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> prefixed() { 
  return inputStream -> inputStream.mapValues(v -> "prefix" + v);
}

然后你可以有两个函数组合流程如下:

spring.cloud.function.definition: uppercase|postfixed;uppercase|prefixed

您可以在每个组合函数绑定上设置输入主题,如下所示。

spring.cloud.stream.bindings.uppercasepostfixed-in-0.destination=<input-topic>
spring.cloud.stream.bindings.uppercaseprefixed-in-0.destination=<input-topic>

通过这种函数组合方法,您可以从 Kafka Streams 获得端到端的语义,并避免额外的中间主题。这里的缺点是 uppercase 函数将为每个传入记录调用两次。

上述方法可行,但在将它们用于您的用例之前请考虑权衡取舍。