Apache Flink:Window 函数和时间的开始
Apache Flink: Window Functions and the beginning of time
在 WindowAssigner
中,一个元素被分配给一个或多个 TimeWindow
实例。在滑动事件时间 window 的情况下,这发生在 SlidingEventTimeWindows#assignWindows
1.
如果 window 带有 size=5
和 slide=1
,带有 timestamp=0
的元素被分配到以下 windows:
- Window(开始=0,结束=5)
- Window(开始=-1,结束=4)
- Window(开始=-2,结束=3)
- Window(开始=-3,结束=2)
- Window(开始=-4,结束=1)
在一张照片中:
+-> Beginning of time
|
|
+----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[-1,4[ Window 2 XXXXX |
| t=[-2,3[ Window 3 XXXXX |
| t=[-3,2[ Window 4 XXXXX |
| t=[-4,1[ Window 5 XXXXX |
| |
| time(-4 to +4) ---- |
| 432101234 |
+---------------------------+------------------+
|
|
|
+
有没有办法告诉Flink有时间的开始而之前没有windows?如果不是,从哪里着手改变它? 在上面的例子中,Flink 应该只有一个 window (t=[4,8[ Window 1
) 作为第一个元素。像这样:
+-> Beginning of time
|
|
+-----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[ 1,6[ Window 2 XXXXX |
| t=[ 2,7[ Window 3 XXXXX |
| t=[ 3,8[ Window 4 XXXXX |
| t=[ 4,9[ Window 5 XXXXX |
| |
| time(-4 to +8) ---- |
| 4321012345678 |
+---------------------------+-------------------+
|
|
|
+
一旦 windows 的数量达到并超过 window 大小,这将不再起作用。那么,在上面的例子中,所有元素都在 5 Windows.
之内
脚注:
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows
目前无法指定Flink作业的有效时间间隔。这也可能有点问题,因为您可能也想将您的工作应用于历史数据。
不过,您可以做的是手动过滤 windows 在超时开始之前开始的内容:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
.assignAscendingTimestamps(x => x._2)
val windowed = input
.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
.apply{ (window, iterable, collector: Collector[Int]) =>
if (window.getStart >= startTime) {
collector.collect(iterable.map(_._1).reduce(_ + _))
} else {
// discard early windows
}
}
windowed.print()
env.execute()
我可能会找到更好的解决方法来解决这个问题。
这个想法是将水印设置到未来足够远的时间点,这样您的 windows 就有足够的数据。早期的 windows 仍然存在,但它们将被丢弃。
这是 AssignerWithPeriodicWatermarks[T]
的概念证明:
class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
var t: Option[Long] = None
var firstTime = true
override def extractTimestamp(el: T, prevTs: Long): Long = {
t = Some(prevTs)
prevTs
}
override def getCurrentWatermark(): Watermark = (t, firstTime) match {
case (None, _) => return null
case (Some(v), false) => new Watermark(v)
case (Some(v), true) => {
firstTime = false
new Watermark(v + wait)
}
}
}
`wait` 是您第一个 window 的大小。
似乎工作正常,但我对 flink 的了解不够确定。
更新:不幸的是,它不起作用(现在我不知道为什么会这样),"early windows" 的密钥流中总是很少有密钥.所以最后我只是过滤了错误的 windows 类似的东西:
val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>
state match {
case None => (Seq(), Some(1))
case Some(s) => (Seq(in), Some(s))
case Some(v) => (Seq(), Some(v+1))
})
在 WindowAssigner
中,一个元素被分配给一个或多个 TimeWindow
实例。在滑动事件时间 window 的情况下,这发生在 SlidingEventTimeWindows#assignWindows
1.
如果 window 带有 size=5
和 slide=1
,带有 timestamp=0
的元素被分配到以下 windows:
- Window(开始=0,结束=5)
- Window(开始=-1,结束=4)
- Window(开始=-2,结束=3)
- Window(开始=-3,结束=2)
- Window(开始=-4,结束=1)
在一张照片中:
+-> Beginning of time
|
|
+----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[-1,4[ Window 2 XXXXX |
| t=[-2,3[ Window 3 XXXXX |
| t=[-3,2[ Window 4 XXXXX |
| t=[-4,1[ Window 5 XXXXX |
| |
| time(-4 to +4) ---- |
| 432101234 |
+---------------------------+------------------+
|
|
|
+
有没有办法告诉Flink有时间的开始而之前没有windows?如果不是,从哪里着手改变它? 在上面的例子中,Flink 应该只有一个 window (t=[4,8[ Window 1
) 作为第一个元素。像这样:
+-> Beginning of time
|
|
+-----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[ 1,6[ Window 2 XXXXX |
| t=[ 2,7[ Window 3 XXXXX |
| t=[ 3,8[ Window 4 XXXXX |
| t=[ 4,9[ Window 5 XXXXX |
| |
| time(-4 to +8) ---- |
| 4321012345678 |
+---------------------------+-------------------+
|
|
|
+
一旦 windows 的数量达到并超过 window 大小,这将不再起作用。那么,在上面的例子中,所有元素都在 5 Windows.
之内脚注:
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows
目前无法指定Flink作业的有效时间间隔。这也可能有点问题,因为您可能也想将您的工作应用于历史数据。
不过,您可以做的是手动过滤 windows 在超时开始之前开始的内容:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
.assignAscendingTimestamps(x => x._2)
val windowed = input
.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
.apply{ (window, iterable, collector: Collector[Int]) =>
if (window.getStart >= startTime) {
collector.collect(iterable.map(_._1).reduce(_ + _))
} else {
// discard early windows
}
}
windowed.print()
env.execute()
我可能会找到更好的解决方法来解决这个问题。
这个想法是将水印设置到未来足够远的时间点,这样您的 windows 就有足够的数据。早期的 windows 仍然存在,但它们将被丢弃。
这是 AssignerWithPeriodicWatermarks[T]
的概念证明:
class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
var t: Option[Long] = None
var firstTime = true
override def extractTimestamp(el: T, prevTs: Long): Long = {
t = Some(prevTs)
prevTs
}
override def getCurrentWatermark(): Watermark = (t, firstTime) match {
case (None, _) => return null
case (Some(v), false) => new Watermark(v)
case (Some(v), true) => {
firstTime = false
new Watermark(v + wait)
}
}
}
更新:不幸的是,它不起作用(现在我不知道为什么会这样),"early windows" 的密钥流中总是很少有密钥.所以最后我只是过滤了错误的 windows 类似的东西:
val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>
state match {
case None => (Seq(), Some(1))
case Some(s) => (Seq(in), Some(s))
case Some(v) => (Seq(), Some(v+1))
})