Flink 偶尔在 Process Time 或 Event Time 中处理记录
Flink processing records in Process Time or in Event Time sporadically
我有以下代码:
stream1
.connect(stream2)
.flatMap(new MyRichCoFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Item>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
)
.keyBy(item -> item.getKey()).window(new MyWindowAssigner())
.trigger(MyEventTimeTrigger())
.process(new MyProcessWindowFunction());
此外,我设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
但是,在我的触发器中,我看到两个函数都有断点
onProcessingTime() is being called
有时
onEventTime()
这是什么原因?
原因是当你将EventTime
设置为时间特性时,Flink仍然会触发处理时间触发器,触发处理时间定时器,通常它会允许你在几个地方仍然使用ProcessingTime
.
这是正确的,在特定情况下可能非常方便,例如,如果您的输入源出现问题并且它没有产生任何输入,您仍然可以使用 ProcessingTime
触发器作为安全开关仍然关闭待定 windows.
请注意,如果您的代码编写正确,这应该不会导致任何问题。
我有以下代码:
stream1
.connect(stream2)
.flatMap(new MyRichCoFlatMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Item>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
)
.keyBy(item -> item.getKey()).window(new MyWindowAssigner())
.trigger(MyEventTimeTrigger())
.process(new MyProcessWindowFunction());
此外,我设置
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
但是,在我的触发器中,我看到两个函数都有断点
onProcessingTime() is being called
有时
onEventTime()
这是什么原因?
原因是当你将EventTime
设置为时间特性时,Flink仍然会触发处理时间触发器,触发处理时间定时器,通常它会允许你在几个地方仍然使用ProcessingTime
.
这是正确的,在特定情况下可能非常方便,例如,如果您的输入源出现问题并且它没有产生任何输入,您仍然可以使用 ProcessingTime
触发器作为安全开关仍然关闭待定 windows.
请注意,如果您的代码编写正确,这应该不会导致任何问题。