kafka steams 应用程序是否可以从单个输入写入多个输出?

Is it possible for a kafka steams application to write multiple outputs from a single input?

我不确定 kafka-streams 是否是我要解决的问题的正确解决方案。由于它提供的并行性和容错性,我希望能够使用它,但我正在努力想出一种方法来实现所需的处理管道。

管道是这样的:

  1. 某种类型的记录到达输入主题
  2. 此记录中的信息用于执行数据库查询,其中 returns 很多结果

我希望能够将每个结果作为单独的记录写出,具有自己的键,而不是作为单个记录中的结果集合。

暂时忽略每个结果要求的单个输出记录,我的代码如下所示:

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<List<MyOutput>> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
outputs.to("output-topic", Produced.with(stringSerde, outputSerde));

这很简单,1 条消息输入,1 条消息(尽管是集合)输出。

我希望能够做的是:

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<MyOutput> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
KStream<String, MyOutput> sink = outputs.???
sink.to("output-topic", Produced.with(stringSerde, outputSerde));

对于在 outputs 流上执行的一个或多个操作,我想不出任何合理的方法。

有什么建议吗?或者 kafka-streams 可能不是解决此类问题的正确方法?

是的,这是可能的,为此您需要使用 KStream flatMap 转换。 FlatMap 将输入流的每条记录转换为输出流中的零条或多条记录(键和值类型均可任意更改)

kStream = kStream.flatMap(
        (key, value) -> {
            List<KeyValue<String, MyOutput>> result = new ArrayList<>();
            // do your logic here
            return result;
        });
kStream.to("output-topic", Produced.with(stringSerde, outputSerde));

谢谢,Vasiliy,flatMap 确实是我所需要的。之前看了一下,以为是正确的操作,后来一头雾水,误删了。

结合我之前的建议和你的建议,下面的工作,假设 MyOutput 实现了一个名为 getKey() 的方法:

Serde<String> stringSerde = Serdes.String();
JsonSerde<MyInput> inputSerde = new JsonSerde<>();
JsonSerde<MyOutput> outputSerde = new JsonSerde<>();
Consumed<String, MyInput> consumer = Consumed.with(stringSerde, inputSerde);

KStream<String, MyInput> receiver = builder.stream("input-topic", consumer);
KStream<String, List<MyOutput>> outputs = receiver.mapValues(this::mapInputToManyOutputs);
KStream<String, MyOutput> sink = outputs.flatMap(((key, value) -> 
    value.stream().map(o -> new KeyValue<>(o.getKey(), o)).collect(Collectors.toList())));
sink.to("output-topic", Produced.with(stringSerde, outputSerde));