Kafka Streams StreamBuilder 是否始终检测 "duplicate" 个输入主题?
Does the Kafka Streams StreamBuilder always detect "duplicate" input topics?
此代码创建两个 KStream
个实例 分别 ,两者都从 相同 主题读取:
final KStream<String, String> inputStream1 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> inputStream2 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream1
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream2
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
mappedStream1.to(OUTPUT_TOPIC_1, produced);
mappedStream2.to(OUTPUT_TOPIC_2, produced);
拓扑看起来像这样:只有一个 source 定义然后被使用 两次 :
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
Processor: KSTREAM-PEEK-0000000002 (stores: [])
--> KSTREAM-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000002
Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
--> KSTREAM-SINK-0000000007
<-- KSTREAM-PEEK-0000000004
Sink: KSTREAM-SINK-0000000006 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000003
Sink: KSTREAM-SINK-0000000007 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000005
现在我的问题是:假设 StreamBuilder
只创建一个来源(= 同一主题只有一个消费者)总是安全的吗?
换句话说:是否总能保证 - 给定一个具有多个分区的主题 - inputStream1
和 inputStream2
看到相同的记录?
或者重写成这样更好,使其明确:
final KStream<String, String> inputStream =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
更新
第二个版本导致此拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> KSTREAM-MAPVALUES-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-PEEK-0000000001
Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000006 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000004
The builder
would be the same, same application.id
不能代表拓扑结构,但考虑到 Consumer API 级别的流程,group.id
是基于 application.id
构建的,因此您的消费者组会两个流都相同。
对于一个输入主题,只有一个消费者实例(介于两者之间)能够从该输入主题中消费。
这可以解释为什么只有一个来源;因此,您不需要使用相同的参数进行额外的 builder.stream()
调用。
此代码创建两个 KStream
个实例 分别 ,两者都从 相同 主题读取:
final KStream<String, String> inputStream1 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> inputStream2 =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream1
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream2
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
mappedStream1.to(OUTPUT_TOPIC_1, produced);
mappedStream2.to(OUTPUT_TOPIC_2, produced);
拓扑看起来像这样:只有一个 source 定义然后被使用 两次 :
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000002, KSTREAM-PEEK-0000000004
Processor: KSTREAM-PEEK-0000000002 (stores: [])
--> KSTREAM-MAPVALUES-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000002
Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])
--> KSTREAM-SINK-0000000007
<-- KSTREAM-PEEK-0000000004
Sink: KSTREAM-SINK-0000000006 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000003
Sink: KSTREAM-SINK-0000000007 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000005
现在我的问题是:假设 StreamBuilder
只创建一个来源(= 同一主题只有一个消费者)总是安全的吗?
换句话说:是否总能保证 - 给定一个具有多个分区的主题 - inputStream1
和 inputStream2
看到相同的记录?
或者重写成这样更好,使其明确:
final KStream<String, String> inputStream =
builder.stream(INPUT_TOPIC, consumed);
final KStream<String, String> mappedStream1 = inputStream
.peek((k, v) -> System.out.println("1: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toLowerCase);
final KStream<String, String> mappedStream2 = inputStream
.peek((k, v) -> System.out.println("2: " + k + " -> " + v))
.mapValues((ValueMapper<String, String>) String::toUpperCase);
更新
第二个版本导致此拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000001, KSTREAM-PEEK-0000000003
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000003 (stores: [])
--> KSTREAM-MAPVALUES-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-PEEK-0000000001
Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000003
Sink: KSTREAM-SINK-0000000005 (topic: output-1)
<-- KSTREAM-MAPVALUES-0000000002
Sink: KSTREAM-SINK-0000000006 (topic: output-2)
<-- KSTREAM-MAPVALUES-0000000004
The
builder
would be the same, sameapplication.id
不能代表拓扑结构,但考虑到 Consumer API 级别的流程,group.id
是基于 application.id
构建的,因此您的消费者组会两个流都相同。
对于一个输入主题,只有一个消费者实例(介于两者之间)能够从该输入主题中消费。
这可以解释为什么只有一个来源;因此,您不需要使用相同的参数进行额外的 builder.stream()
调用。