Kafka Streams StreamBuilder 是否始终检测 "duplicate" 个输入主题?

Does the Kafka Streams StreamBuilder always detect "duplicate" input topics?

此代码创建两个 KStream 个实例 分别 ,两者都从 相同 主题读取:

    final KStream<String, String> inputStream1 =
      builder.stream(INPUT_TOPIC, consumed);
    final KStream<String, String> inputStream2 =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream1
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream2
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

    mappedStream1.to(OUTPUT_TOPIC_1, produced);
    mappedStream2.to(OUTPUT_TOPIC_2, produced);

拓扑看起来像这样:只有一个 source 定义然后被使用 两次 :

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
    Processor: KSTREAM-PEEK-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000004 (stores: [])
      --> KSTREAM-MAPVALUES-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000002
    Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-PEEK-0000000004
    Sink: KSTREAM-SINK-0000000006 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000007 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000005

现在我的问题是:假设 StreamBuilder 只创建一个来源(= 同一主题只有一个消费者)总是安全的吗?

换句话说:是否总能保证 - 给定一个具有多个分区的主题 - inputStream1inputStream2 看到相同的记录?

或者重写成这样更好,使其明确:

    final KStream<String, String> inputStream =
      builder.stream(INPUT_TOPIC, consumed);

    final KStream<String, String> mappedStream1 = inputStream
            .peek((k, v) -> System.out.println("1: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toLowerCase);

    final KStream<String, String> mappedStream2 = inputStream
            .peek((k, v) -> System.out.println("2: " + k + " -> " + v))
            .mapValues((ValueMapper<String, String>) String::toUpperCase);

更新

第二个版本导致此拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000003 (stores: [])
      --> KSTREAM-MAPVALUES-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-PEEK-0000000001
    Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000003
    Sink: KSTREAM-SINK-0000000005 (topic: output-1)
      <-- KSTREAM-MAPVALUES-0000000002
    Sink: KSTREAM-SINK-0000000006 (topic: output-2)
      <-- KSTREAM-MAPVALUES-0000000004

The builder would be the same, same application.id

不能代表拓扑结构,但考虑到 Consumer API 级别的流程,group.id 是基于 application.id 构建的,因此您的消费者组会两个流都相同。

对于一个输入主题,只有一个消费者实例(介于两者之间)能够从该输入主题中消费。

这可以解释为什么只有一个来源;因此,您不需要使用相同的参数进行额外的 builder.stream() 调用。