根据存储对象的属性修剪 flink state

Pruning flink state based on attributes of stored object

考虑以下示例 class,其实例存储在 ListState:

class BusinessObject {
  long clientId;
  String region;
  Instant lastDealDate; 
  bool isActive;
}

应用程序要求如果自上次与特定客户达成交易 (lastDealDate) 已过去 1 年,此对象不应处于 flink 状态 and 客户端未激活即 isActive == false

解决这个问题并让 flink 知道这两个因素以便它自动删除这些条目的正确方法是什么?目前,我读取了状态中的所有项目,清除状态,然后添加回相关的项目,但是随着客户端数量的增加和状态的规模变大,这将开始花费很长时间。我的大部分在线搜索都在谈论使用 time-to-live 并通过 descriptor 为我的状态设置它。但是,我的逻辑不能依赖 processing/event/ingestion 时间,我还需要检查 isActive 是否为假。

额外信息:上下文没有键控,后端是 RocksDB。使用 ListState 的原因是因为根据上述条件需要每天转储所有相关的 state/history。

有什么建议吗?

使用 RocksDB 状态后端,Flink 可以在不经过 serialization/deserialization 的情况下附加到 ListState,但是由于 ser/de.

,除了附加之外的任何读取或修改都是昂贵的

如果您可以重新工作以便将这些 BusinessObjects 存储在 MapState 中,即使您偶尔需要遍历整个地图,您也会过得更好。 MapState 中的每个 key/value 对都将是一个单独的 RocksDB 条目,您将能够单独 create/update/delete 它们而无需遍历 ser/de 整个地图(除非您有扫描)。 (对于它的价值,在 RocksDB 中迭代 MapState 以 serialized-key-sorted 顺序遍历地图。)

MapState 只能作为键控(或广播)状态使用,因此此更改需要您对流进行键控。使用 keyBy 确实会强制进行网络洗牌(和 ser/de),因此它会很昂贵,但不如使用 ListState 昂贵。