kafka 流中的聚合和状态存储保留
Aggregration and state store retention in kafka streams
我有一个如下的用例。对于每个传入事件,我想看看
某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送给
输出主题。流程是这样的:一个键为 "xyz" 的事件以状态 A 进来,一段时间后
另一个事件带有密钥 "xyz",状态为 B。我有使用高级 DSL 的代码。
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
是否有使用 DSL 编写此逻辑的更好方法?
关于上面代码中聚合创建的状态存储的几个问题。
- 是否默认创建内存状态存储?
- 如果我有无限数量的唯一传入键会怎样?
如果它默认使用内存存储,我不需要切换到持久存储吗?
我们如何处理 DSL 中的这种情况?
- 如果状态存储非常大(内存中或持久化),它有什么影响
启动时间?如何让流处理等待,以便商店完全初始化?
或者 Kafka Streams 会确保状态存储在处理任何传入事件之前完全初始化吗?
提前致谢!
默认情况下,将使用持久性 RocksDB 存储。如果你想使用内存存储,你会传入 Materialized.as(Stores.inMemoryKeyValueStore(...))
如果您有无限数量的唯一键,您最终会 运行 主内存或磁盘不足,您的应用程序将死掉。根据您的语义,您可以通过使用带有大 "gap" 参数的会话窗口聚合来获得 "TTL",而不是使旧密钥过期。
在处理新数据之前总是会恢复状态。如果您使用内存存储,这将通过使用底层变更日志主题来实现。根据您所在州的大小,这可能需要一段时间。如果您使用持久性 RocksDB 存储,状态将从磁盘加载,因此不需要恢复,并且应该立即进行处理。仅当您丢失本地磁盘上的状态时,才会在这种情况下从变更日志主题恢复。
我有一个如下的用例。对于每个传入事件,我想看看 某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送给 输出主题。流程是这样的:一个键为 "xyz" 的事件以状态 A 进来,一段时间后 另一个事件带有密钥 "xyz",状态为 B。我有使用高级 DSL 的代码。
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
是否有使用 DSL 编写此逻辑的更好方法?
关于上面代码中聚合创建的状态存储的几个问题。
- 是否默认创建内存状态存储?
- 如果我有无限数量的唯一传入键会怎样? 如果它默认使用内存存储,我不需要切换到持久存储吗? 我们如何处理 DSL 中的这种情况?
- 如果状态存储非常大(内存中或持久化),它有什么影响 启动时间?如何让流处理等待,以便商店完全初始化? 或者 Kafka Streams 会确保状态存储在处理任何传入事件之前完全初始化吗?
提前致谢!
默认情况下,将使用持久性 RocksDB 存储。如果你想使用内存存储,你会传入
Materialized.as(Stores.inMemoryKeyValueStore(...))
如果您有无限数量的唯一键,您最终会 运行 主内存或磁盘不足,您的应用程序将死掉。根据您的语义,您可以通过使用带有大 "gap" 参数的会话窗口聚合来获得 "TTL",而不是使旧密钥过期。
在处理新数据之前总是会恢复状态。如果您使用内存存储,这将通过使用底层变更日志主题来实现。根据您所在州的大小,这可能需要一段时间。如果您使用持久性 RocksDB 存储,状态将从磁盘加载,因此不需要恢复,并且应该立即进行处理。仅当您丢失本地磁盘上的状态时,才会在这种情况下从变更日志主题恢复。