一个清除非活动键的 Flink 流状态如何?

How does one cleanup Flink stream state for inactive keys?

我的目标是有一个 Flink 流程序,它保留最后 N 个 id,其中 id 是从事件中提取的。接收器是 Cassandra 存储,因此可以随时获取 ID 列表。重要的是,Cassandra 在每个事件发生后立即更新。

这可以通过 mapWithState 轻松实现(参见下面的代码)。但是,此代码存在重要问题。状态由 userid 键控。有些用户可能会活跃一段时间,然后就再也不会活跃了。我担心的是状态存储会永远增长。

非活动键的清理状态如何?

case class MyEvent(userId: Int, id: String)

env
  .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
  .keyBy(_.userId)
  .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
    val keepNIds = currentIds match {
      case None => Seq(in.id)
      case Some(cids) => (cids :+ in.id).takeRight(100)
    }
    ((in.userId, keepNIds), Some(keepNIds))
  }
  .addSink { in: (Int, Seq[String]) =>
    CassandraSink.appDatabase.idsTable.store(...)
  }

生长状态是一个重要而正确的观察。如果您的键空间正在移动,这肯定会发生。

Flink 1.2.0 添加了 ProcessFunction 来解决这个问题。 ProcessFunction 类似于 FlatMapFunction 但可以访问计时器服务。您可以注册计时器,它们在到期时调用 onTimer() 回调函数。回调可用于清理状态。