kafka 流中的聚合和状态存储保留

Aggregration and state store retention in kafka streams

我有一个如下的用例。对于每个传入事件,我想看看 某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送给 输出主题。流程是这样的:一个键为 "xyz" 的事件以状态 A 进来,一段时间后 另一个事件带有密钥 "xyz",状态为 B。我有使用高级 DSL 的代码。

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

是否有使用 DSL 编写此逻辑的更好方法?

关于上面代码中聚合创建的状态存储的几个问题。

  1. 是否默认创建内存状态存储?
  2. 如果我有无限数量的唯一传入键会怎样? 如果它默认使用内存存储,我不需要切换到持久存储吗? 我们如何处理 DSL 中的这种情况?
  3. 如果状态存储非常大(内存中或持久化),它有什么影响 启动时间?如何让流处理等待,以便商店完全初始化? 或者 Kafka Streams 会确保状态存储在处理任何传入事件之前完全初始化吗?

提前致谢!

  1. 默认情况下,将使用持久性 RocksDB 存储。如果你想使用内存存储,你会传入 Materialized.as(Stores.inMemoryKeyValueStore(...))

  2. 如果您有无限数量的唯一键,您最终会 运行 主内存或磁盘不足,您的应用程序将死掉。根据您的语义,您可以通过使用带有大 "gap" 参数的会话窗口聚合来获得 "TTL",而不是使旧密钥过期。

  3. 在处理新数据之前总是会恢复状态。如果您使用内存存储,这将通过使用底层变更日志主题来实现。根据您所在州的大小,这可能需要一段时间。如果您使用持久性 RocksDB 存储,状态将从磁盘加载,因此不需要恢复,并且应该立即进行处理。仅当您丢失本地磁盘上的状态时,才会在这种情况下从变更日志主题恢复。