访问动态变化的 Apache Flink 中的每个键状态存储
Accessing per-key state store in Apache Flink that changes dynamically
我有一连串使用不同密钥的消息。对于每个键,我想创建一个事件时间会话 window 并仅在以下情况下对其进行一些处理:
MIN_EVENTS
事件数已累积在 window(本质上是键控状态)
对于每个键,MIN_EVENTS
是不同的并且可能会在运行时发生变化。我很难实现这个。特别是,我是这样实现这个逻辑的:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
我正在尝试创建一个自定义 MyCustomCountTrigger()
,它应该能够从 MapState<String, Integer> stateStore
等将 key
映射到它的 MIN_EVENTS
参数的状态存储中读取。我知道我可以使用所有触发器都可用的 TriggerContext ctx
对象访问状态存储。
如何从 CountTrigger() 外部初始化此状态存储 class?我没能找到这样做的例子。
您可以根据发送到触发器构造函数的参数初始化状态 class。但是您无法从 class 外部访问该状态。
如果您需要更大的灵活性,我建议您使用过程函数而不是 window。
我有一连串使用不同密钥的消息。对于每个键,我想创建一个事件时间会话 window 并仅在以下情况下对其进行一些处理:
MIN_EVENTS
事件数已累积在 window(本质上是键控状态)
对于每个键,MIN_EVENTS
是不同的并且可能会在运行时发生变化。我很难实现这个。特别是,我是这样实现这个逻辑的:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
我正在尝试创建一个自定义 MyCustomCountTrigger()
,它应该能够从 MapState<String, Integer> stateStore
等将 key
映射到它的 MIN_EVENTS
参数的状态存储中读取。我知道我可以使用所有触发器都可用的 TriggerContext ctx
对象访问状态存储。
如何从 CountTrigger() 外部初始化此状态存储 class?我没能找到这样做的例子。
您可以根据发送到触发器构造函数的参数初始化状态 class。但是您无法从 class 外部访问该状态。
如果您需要更大的灵活性,我建议您使用过程函数而不是 window。