如何在 Apache Flink 中使用 TTL 使键控状态过期?

How to expire keyed state with TTL in Apache Flink?

我有这样的管道:

 env.addSource(kafkaConsumer)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.minutes(2)))
            .reduce(::reduceRecord)
            .addSink(kafkaProducer)

我想使用 TTL 使键控数据过期。

一些博客文章指出我需要一个 ValueStateDescriptor。 我做了一个这样的:

val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)

但我如何实际将此描述符应用到我的管道,以便它实际执行 TTL 到期?

您描述的管道没有使用任何可以从设置状态 TTL 中获益的键控状态。管道中唯一的键控状态是会话 windows 的内容,并且该状态将在会话关闭时尽快清除。 (此外,由于您使用的是 reduce 函数,因此该状态每个键仅包含一个值。)

在大多数情况下,过期状态仅与您明确创建的状态相关,在这种情况下,您可以随时访问状态描述符并可以将其配置为使用状态 TTL。 Flink SQL 确实会代表您创建可能不会自动过期的状态,在这种情况下,您将需要使用 Idle State Retention Time 来配置它。 CEP 库还代表您创建状态,在这种情况下,您应该确保您的模式最终匹配​​或超时。