Apache Flink - 如果 x 分钟内未收到数据,则发送事件
Apache Flink - Send event if no data was received for x minutes
如何使用 Flink 的 DataStream API 实现一个运算符,当在一定时间内没有从流中接收到数据时发送一个事件?
您可以使用自定义触发函数设置时间 window。在触发器函数中,每次接收到事件时,"onEvent" 方法都会将 processingTimeTrigger 设置为 "currentTime + desiredTimeDelay"。然后当一个新事件到来时,你删除之前设置的触发器并创建一个新的。如果在系统时间是 processingTimeTrigger 上的时间时事件没有到来,它将触发并且 window 将被处理。即使没有事件发生,要处理的事件列表也只是空的。
这样的运算符可以使用 ProcessFunction
.
来实现
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
input
// use keyBy to have keyed state.
// NullByteKeySelector will move all data to one task. You can also use other keys
.keyBy(new NullByteKeySelector())
// use process function with 60 seconds timeout
.process(new TimeOutFunction(60 * 1000));
TimeOutFunction
定义如下。在此示例中,它使用处理时间。
public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private transient ValueState<Long> lastTimer;
public TimeOutFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer state
ValueStateDescriptor<Long> lastTimerDesc =
new ValueStateDescriptor<Long>("lastTimer", Long.class);
lastTimer = getRuntimeContext().getState(lastTimerDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
// get current time and compute timeout time
long currentTime = ctx.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
ctx.timerService().registerProcessingTimeTimer(timeoutTime);
// remember timeout time
lastTimer.update(timeoutTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
// check if this was the last timer we registered
if (timestamp == lastTimer.value()) {
// it was, so no data was received afterwards.
// fire an alert.
out.collect(true);
}
}
}
如何使用 Flink 的 DataStream API 实现一个运算符,当在一定时间内没有从流中接收到数据时发送一个事件?
您可以使用自定义触发函数设置时间 window。在触发器函数中,每次接收到事件时,"onEvent" 方法都会将 processingTimeTrigger 设置为 "currentTime + desiredTimeDelay"。然后当一个新事件到来时,你删除之前设置的触发器并创建一个新的。如果在系统时间是 processingTimeTrigger 上的时间时事件没有到来,它将触发并且 window 将被处理。即使没有事件发生,要处理的事件列表也只是空的。
这样的运算符可以使用 ProcessFunction
.
DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
input
// use keyBy to have keyed state.
// NullByteKeySelector will move all data to one task. You can also use other keys
.keyBy(new NullByteKeySelector())
// use process function with 60 seconds timeout
.process(new TimeOutFunction(60 * 1000));
TimeOutFunction
定义如下。在此示例中,它使用处理时间。
public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private transient ValueState<Long> lastTimer;
public TimeOutFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer state
ValueStateDescriptor<Long> lastTimerDesc =
new ValueStateDescriptor<Long>("lastTimer", Long.class);
lastTimer = getRuntimeContext().getState(lastTimerDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
// get current time and compute timeout time
long currentTime = ctx.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
ctx.timerService().registerProcessingTimeTimer(timeoutTime);
// remember timeout time
lastTimer.update(timeoutTime);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
// check if this was the last timer we registered
if (timestamp == lastTimer.value()) {
// it was, so no data was received afterwards.
// fire an alert.
out.collect(true);
}
}
}