Flink windows 和延迟事件
Flink windows and late events
我有一个用例,我需要处理与正常事件不同的延迟事件:如果事件在其 window 关闭后到达,则应将其发送到另一条路径。
我认为 .sideOutputLateData(..)
会为我解决这个问题。它在正常情况下(即使用真实世界的数据)。但是如果我想测试它,用伪造的数据,它就停止工作了。
我希望是这样的:
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)
val events: DataStream[(Int, Long)] = env.fromElements(
(1, 1),
(1, 15),
(1, 25),
(1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")
val windowedSum = events
.assignAscendingTimestamps(e => e._2)
.windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
.sideOutputLateData(lateEvents)
.sum(position=0)
val lateEventsStream = windowedSum
.getSideOutput(lateEvents)
// Handle differently
.map(e => (e._1 + 100, e._2))
windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")
会导致:
[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)
相反,我得到:
[info] (2,1)
[info] (1,15)
[info] (1,25)
如果我使用 socketTextStream
作为具有相同数据的源,它会按预期工作。
这告诉我,不知何故,水印没有像它应该以非常快的数据输入那样前进。
我尝试将 setAutoWatermarkInterval
调整为一个非常小的值,但没有成功。
我错过了什么吗?我怎样才能测试我的工作?
您可以尝试使用 WatermarkStrategy
为每个元素发出 Watermark 而不是定期生成它们。您可以通过在实现 WatermarkGenerator
时在 onElement
内发出 Watermark 来做到这一点,如 here 所述。这是最好和最可靠的测试方法。
感谢@Dominik Wosinski 为我指明了正确的方向。
对于迷失在错综复杂的 Flink 文档中的其他人,我将 post 在这里给出我的解决方案:
正如所怀疑的那样,问题是快速输入数据没有推进水印。这是因为默认情况下,Flink 将每 200 毫秒检查一次水印是否应该前进。您可以使用
缩短此延迟(增加系统负载)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower
显然这对于作为 4 元素列表的快速输入是不够的。
解决方案是在每个事件中发出水印(请注意,在生产环境中不推荐)。
要实现这样的解决方案,我们需要扩展 WatermarkGenerator
class:
class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {
override def onEvent(
event: (Int, Long),
eventTimestamp: Long,
output: WatermarkOutput
): Unit = {
// emit at every event
output.emitWatermark(new Watermark(event._2))
}
// do nothing at AutoWatermarkInterval
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}
要将此生成器分配给流,我们首先需要创建一个 WatermarkStrategy
:
class MyStrategy extends WatermarkStrategy[(Int, Long)] {
override def createWatermarkGenerator(
context: WatermarkGeneratorSupplier.Context
): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}
(这个class也可以实现一个可选的createTimestampAssigner
方法)
然后我们可以在流中使用它:
eventsStream
.assignTimestampsAndWatermarks(new MyStrategy())
我有一个用例,我需要处理与正常事件不同的延迟事件:如果事件在其 window 关闭后到达,则应将其发送到另一条路径。
我认为 .sideOutputLateData(..)
会为我解决这个问题。它在正常情况下(即使用真实世界的数据)。但是如果我想测试它,用伪造的数据,它就停止工作了。
我希望是这样的:
val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)
val events: DataStream[(Int, Long)] = env.fromElements(
(1, 1),
(1, 15),
(1, 25),
(1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")
val windowedSum = events
.assignAscendingTimestamps(e => e._2)
.windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
.sideOutputLateData(lateEvents)
.sum(position=0)
val lateEventsStream = windowedSum
.getSideOutput(lateEvents)
// Handle differently
.map(e => (e._1 + 100, e._2))
windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")
会导致:
[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)
相反,我得到:
[info] (2,1)
[info] (1,15)
[info] (1,25)
如果我使用 socketTextStream
作为具有相同数据的源,它会按预期工作。
这告诉我,不知何故,水印没有像它应该以非常快的数据输入那样前进。
我尝试将 setAutoWatermarkInterval
调整为一个非常小的值,但没有成功。
我错过了什么吗?我怎样才能测试我的工作?
您可以尝试使用 WatermarkStrategy
为每个元素发出 Watermark 而不是定期生成它们。您可以通过在实现 WatermarkGenerator
时在 onElement
内发出 Watermark 来做到这一点,如 here 所述。这是最好和最可靠的测试方法。
感谢@Dominik Wosinski 为我指明了正确的方向。 对于迷失在错综复杂的 Flink 文档中的其他人,我将 post 在这里给出我的解决方案:
正如所怀疑的那样,问题是快速输入数据没有推进水印。这是因为默认情况下,Flink 将每 200 毫秒检查一次水印是否应该前进。您可以使用
缩短此延迟(增加系统负载)val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower
显然这对于作为 4 元素列表的快速输入是不够的。
解决方案是在每个事件中发出水印(请注意,在生产环境中不推荐)。
要实现这样的解决方案,我们需要扩展 WatermarkGenerator
class:
class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {
override def onEvent(
event: (Int, Long),
eventTimestamp: Long,
output: WatermarkOutput
): Unit = {
// emit at every event
output.emitWatermark(new Watermark(event._2))
}
// do nothing at AutoWatermarkInterval
override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}
要将此生成器分配给流,我们首先需要创建一个 WatermarkStrategy
:
class MyStrategy extends WatermarkStrategy[(Int, Long)] {
override def createWatermarkGenerator(
context: WatermarkGeneratorSupplier.Context
): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}
(这个class也可以实现一个可选的createTimestampAssigner
方法)
然后我们可以在流中使用它:
eventsStream
.assignTimestampsAndWatermarks(new MyStrategy())