Kafka KStream——拓扑设计
Kafka KStream - topology design
我的流是 key/value 对,我想将它们保存到数据库中作为 'raw' 并按 60 秒聚合。原来我是这样做的:
->foreach
/
kStreamBuilder.stream->aggregateBy->process
但后来我发现
一个。 .aggregateby()
只有 returns 它匹配的对(我需要所有的 - 匹配或其他)
b.我可以在 .process()
阶段使用 HashMap 实现相同的聚合效果。然后,当 .punctuate()
被调用时,我将所有 k/v 对写入数据库。
因此生成的拓扑变为:
kStreamBuilder.stream->foreach
kStreamBuilder.stream->process
问题:
- 这是一个 'reasonable' 方法来获得写入所有匹配或不匹配的 kv 对的结果吗? (所有值通过 foreach 和任何对 + 其余通过 process)
- 我是否需要(以某种方式)在将原始流发送到
.foreach()
和 .process()
之前对其进行拆分,或者是否足以执行上述操作?
DSL层的聚合是为"incremental aggregation"设计的,即当前聚合结果加上单个新值是"added"。如果要一次访问 60 秒 window 的所有 "raw record",则需要使用处理器 API.
如果你有两个下游运营商,你不需要做任何事情。记录将自动转发给双方。但是,请记住,它们不会被复制,即,对于每条记录,两个下游操作员都会看到相同的 Java 对象!
我的流是 key/value 对,我想将它们保存到数据库中作为 'raw' 并按 60 秒聚合。原来我是这样做的:
->foreach
/
kStreamBuilder.stream->aggregateBy->process
但后来我发现
一个。 .aggregateby()
只有 returns 它匹配的对(我需要所有的 - 匹配或其他)
b.我可以在 .process()
阶段使用 HashMap 实现相同的聚合效果。然后,当 .punctuate()
被调用时,我将所有 k/v 对写入数据库。
因此生成的拓扑变为:
kStreamBuilder.stream->foreach
kStreamBuilder.stream->process
问题:
- 这是一个 'reasonable' 方法来获得写入所有匹配或不匹配的 kv 对的结果吗? (所有值通过 foreach 和任何对 + 其余通过 process)
- 我是否需要(以某种方式)在将原始流发送到
.foreach()
和.process()
之前对其进行拆分,或者是否足以执行上述操作?
DSL层的聚合是为"incremental aggregation"设计的,即当前聚合结果加上单个新值是"added"。如果要一次访问 60 秒 window 的所有 "raw record",则需要使用处理器 API.
如果你有两个下游运营商,你不需要做任何事情。记录将自动转发给双方。但是,请记住,它们不会被复制,即,对于每条记录,两个下游操作员都会看到相同的 Java 对象!