Kafka Streams - 低级处理器 API - RocksDB TimeToLive(TTL)
Kafka Streams - Low-Level Processor API - RocksDB TimeToLive(TTL)
我正在尝试使用低级处理器 API。我正在使用处理器 API 对传入记录进行数据聚合,并将聚合记录写入 RocksDB。
但是,我想保留添加到 rocksdb 中的记录仅在 24 小时内有效。 24 小时后记录应被删除。这可以通过更改 ttl 设置来完成。但是,没有太多文档可以帮助我解决这个问题。
如何更改 ttl 值?我应该使用什么 java api 来将 ttl 时间设置为 24 小时?当前的默认 ttl 设置时间是多少?
我相信这目前没有通过 api 或配置公开。
RocksDBStore 在打开 RocksDB 时传递一个硬编码的 TTL:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L158
并且硬编码值只是 TTL_SECONDS = TTL_NOT_USED (-1)(请参阅同一文件中的第 79 行)。
目前有 2 个关于在 state stores 中公开 TTL 支持的公开票:KAFKA-4212 和 KAFKA-4273:
https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22rocksdb%20ttl%22
我建议您对其中一个描述您的用例的评论进行评论,以推动他们前进。
在此期间,如果您现在需要 TTL 功能,状态存储是可插入的,并且 RocksDBStore 源随时可用,因此您可以分叉它并设置您的 TTL 值(或者,像与 KAFKA 相关的拉取请求-4273 建议,从配置中获取。
我知道这并不理想,真诚地希望有人能给出更满意的答案。
我正在尝试使用低级处理器 API。我正在使用处理器 API 对传入记录进行数据聚合,并将聚合记录写入 RocksDB。
但是,我想保留添加到 rocksdb 中的记录仅在 24 小时内有效。 24 小时后记录应被删除。这可以通过更改 ttl 设置来完成。但是,没有太多文档可以帮助我解决这个问题。
如何更改 ttl 值?我应该使用什么 java api 来将 ttl 时间设置为 24 小时?当前的默认 ttl 设置时间是多少?
我相信这目前没有通过 api 或配置公开。 RocksDBStore 在打开 RocksDB 时传递一个硬编码的 TTL: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L158 并且硬编码值只是 TTL_SECONDS = TTL_NOT_USED (-1)(请参阅同一文件中的第 79 行)。
目前有 2 个关于在 state stores 中公开 TTL 支持的公开票:KAFKA-4212 和 KAFKA-4273: https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22rocksdb%20ttl%22 我建议您对其中一个描述您的用例的评论进行评论,以推动他们前进。
在此期间,如果您现在需要 TTL 功能,状态存储是可插入的,并且 RocksDBStore 源随时可用,因此您可以分叉它并设置您的 TTL 值(或者,像与 KAFKA 相关的拉取请求-4273 建议,从配置中获取。
我知道这并不理想,真诚地希望有人能给出更满意的答案。