Flink 中最大时间 window 的更新流

Stream of updates to the largest time window in Flink

从一个时间 window 编辑的键控流中,我想获得迄今为止看到的最大 window 的流(在元素数量方面最大)。

目前我有以下代码:

source
  .keyBy(...)
  .timeWindow(...)
  .fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
  .keyBy(_ => ())
  .maxBy(1)

fold 的结果是 (key, count) 元素的流 - 所以从这个流中,我想获得 "key with highest count".[=16= 的更新流]

然后我输入一个常量(keyBy(_ => ()) - 因为这是一个全局操作),然后使用 maxBy - 这个 almost 有效:我正在获取最高计数流,但每个元素都会发出当前最高计数。

我认为我正在寻找的是某种具有先前值的过滤器,它只会在新值与先前值不同时才发出元素。

目前Flink可以做到吗?

Flink 默认没有这样的过滤器,但是你自己实现一个应该很容易。

您可以使用类似于此的有状态 FlatMap 来执行此操作:

val source: DataStream[Int] = ???

source
  .keyBy(_: Int => _)
  .timeWindow(Time.minutes(10))
  .fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
  // move everything to the same key
  .keyBy(_ => 0) 
  // use stateful flatmap to remember highest count and filter by that
  .flatMapWithState( (in, state: Option[Int]) => 
    // filter condition
    if (in._2 > state.getOrElse(-1)) 
      // emit new value and update max count
      (Seq(in), Some(in._2)) 
    else 
      // emit nothing (empty Seq()) and keep count
      (Seq(), state)
  ).setParallelism(1)

如果非并行(单线程)过滤器运算符成为瓶颈,您可以通过添加具有随机键的 keyBy 和具有更高并行度的有状态过滤器 FlatMap 来添加并行预过滤器.