Flink 中最大时间 window 的更新流
Stream of updates to the largest time window in Flink
从一个时间 window 编辑的键控流中,我想获得迄今为止看到的最大 window 的流(在元素数量方面最大)。
目前我有以下代码:
source
.keyBy(...)
.timeWindow(...)
.fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
.keyBy(_ => ())
.maxBy(1)
fold
的结果是 (key, count)
元素的流 - 所以从这个流中,我想获得 "key with highest count".[=16= 的更新流]
然后我输入一个常量(keyBy(_ => ())
- 因为这是一个全局操作),然后使用 maxBy
- 这个 almost 有效:我正在获取最高计数流,但每个元素都会发出当前最高计数。
我认为我正在寻找的是某种具有先前值的过滤器,它只会在新值与先前值不同时才发出元素。
目前Flink可以做到吗?
Flink 默认没有这样的过滤器,但是你自己实现一个应该很容易。
您可以使用类似于此的有状态 FlatMap
来执行此操作:
val source: DataStream[Int] = ???
source
.keyBy(_: Int => _)
.timeWindow(Time.minutes(10))
.fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
// move everything to the same key
.keyBy(_ => 0)
// use stateful flatmap to remember highest count and filter by that
.flatMapWithState( (in, state: Option[Int]) =>
// filter condition
if (in._2 > state.getOrElse(-1))
// emit new value and update max count
(Seq(in), Some(in._2))
else
// emit nothing (empty Seq()) and keep count
(Seq(), state)
).setParallelism(1)
如果非并行(单线程)过滤器运算符成为瓶颈,您可以通过添加具有随机键的 keyBy
和具有更高并行度的有状态过滤器 FlatMap
来添加并行预过滤器.
从一个时间 window 编辑的键控流中,我想获得迄今为止看到的最大 window 的流(在元素数量方面最大)。
目前我有以下代码:
source
.keyBy(...)
.timeWindow(...)
.fold((DummyKey, 0)) { case ((_, current), key) => (key, current + 1) }
.keyBy(_ => ())
.maxBy(1)
fold
的结果是 (key, count)
元素的流 - 所以从这个流中,我想获得 "key with highest count".[=16= 的更新流]
然后我输入一个常量(keyBy(_ => ())
- 因为这是一个全局操作),然后使用 maxBy
- 这个 almost 有效:我正在获取最高计数流,但每个元素都会发出当前最高计数。
我认为我正在寻找的是某种具有先前值的过滤器,它只会在新值与先前值不同时才发出元素。
目前Flink可以做到吗?
Flink 默认没有这样的过滤器,但是你自己实现一个应该很容易。
您可以使用类似于此的有状态 FlatMap
来执行此操作:
val source: DataStream[Int] = ???
source
.keyBy(_: Int => _)
.timeWindow(Time.minutes(10))
.fold((1, 0)) { case ((_, current), key) => (key, current + 1) }
// move everything to the same key
.keyBy(_ => 0)
// use stateful flatmap to remember highest count and filter by that
.flatMapWithState( (in, state: Option[Int]) =>
// filter condition
if (in._2 > state.getOrElse(-1))
// emit new value and update max count
(Seq(in), Some(in._2))
else
// emit nothing (empty Seq()) and keep count
(Seq(), state)
).setParallelism(1)
如果非并行(单线程)过滤器运算符成为瓶颈,您可以通过添加具有随机键的 keyBy
和具有更高并行度的有状态过滤器 FlatMap
来添加并行预过滤器.