全局 Window 自定义触发器上的 allowedLateness
allowedLateness on Global Window custom trigger
我已经为我的事件流创建了自定义触发器和处理函数。
DataStream<DynamoDBRow> dynamoDBRows =
sensorEvents
.keyBy("id")
.window(GlobalWindows.create())
.trigger(new MyCustomTrigger())
.allowedLateness(Time.minutes(1)) # Note
.process(new MyCustomWindowProcessFunction());
我的触发器基于事件参数。收到事件结束信号后,MyCustomWindowProcessFunction() 将应用于 window 元素。
@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
传感器数据可能很少,即使在触发器之后也可能出现。所以我添加了 .allowedLateness(Time.minutes(1))
,以确保在处理过程中不会遗漏这些事件。
就我而言,allowedLateness 不起作用。
查阅文件后,我发现了这个
如何在 GlobalWindow 中包含 allowedLateness?
注意: 我也试过设置环境时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
更新:20-02-2020
目前正在考虑以下方法。 (目前没有工作)
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onEvenTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
老实说,我没有理由在这里使用 GlobalWindow
。您可以只使用与您的 Trigger
具有相同目的的 KeyedProcessFunction
,基本上,它会将事件开始到事件结束的所有元素收集到 ListState
然后当您收到 isEventEnd()==true
时,您可以简单地安排 EventTime
计时器,该计时器将在一分钟后触发并发出 ListState
.
中收集的结果
最后,我能够使用下面的自定义触发器实现我的要求。
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd()==true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onProcessingTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
同样在Driver.java
class,设置环境Time Characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
我已经为我的事件流创建了自定义触发器和处理函数。
DataStream<DynamoDBRow> dynamoDBRows =
sensorEvents
.keyBy("id")
.window(GlobalWindows.create())
.trigger(new MyCustomTrigger())
.allowedLateness(Time.minutes(1)) # Note
.process(new MyCustomWindowProcessFunction());
我的触发器基于事件参数。收到事件结束信号后,MyCustomWindowProcessFunction() 将应用于 window 元素。
@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
传感器数据可能很少,即使在触发器之后也可能出现。所以我添加了 .allowedLateness(Time.minutes(1))
,以确保在处理过程中不会遗漏这些事件。
就我而言,allowedLateness 不起作用。
查阅文件后,我发现了这个
如何在 GlobalWindow 中包含 allowedLateness?
注意: 我也试过设置环境时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
更新:20-02-2020
目前正在考虑以下方法。 (目前没有工作)
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onEvenTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
老实说,我没有理由在这里使用 GlobalWindow
。您可以只使用与您的 Trigger
具有相同目的的 KeyedProcessFunction
,基本上,它会将事件开始到事件结束的所有元素收集到 ListState
然后当您收到 isEventEnd()==true
时,您可以简单地安排 EventTime
计时器,该计时器将在一分钟后触发并发出 ListState
.
最后,我能够使用下面的自定义触发器实现我的要求。
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd()==true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onProcessingTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
同样在Driver.java
class,设置环境Time Characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);