Flink 自定义触发器提供意外输出
Flink Custom Trigger giving Unexpected Output
我想创建一个 Trigger
,第一次在 20 秒内触发,此后每 5 秒触发一次。我使用了 GlobalWindows
和自定义 Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
这是TradeTrigger
中的代码:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
所以基本上,当 flag
是 false
时,即第一次,Trigger
应该在 20 秒内触发并将 flag
设置为 true
.从下一次开始,它应该每 5 秒触发一次。
我面临的问题是,每次触发 Trigger
时,我在输出中只收到一条消息。也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息。
我期望每次触发时输出中有 20 条消息。
如果我使用 .timeWindow(Time.seconds(5))
并创建一个五秒的时间 window,我每 5 秒会在输出中收到 20 条消息。
请帮我把这段代码弄对。有什么我想念的吗?
您的触发器实施存在一些问题:
永远不要将函数的状态存储在静态变量中。 Flink 不隔离 JVM 中的用户进程。相反,它为每个 TaskManager 使用一个 JVM 并启动多个线程。因此,您的静态布尔标志在多个触发器实例之间共享。您应该存储可从 TriggerContext
访问的 Flink 的 ValueState
接口标志。 Flink 会注意检查你的状态并在发生故障时恢复它。
Trigger.onEvent()
仅在新事件到达时调用。因此它不能用于在特定时间触发 Window 计算。相反,您应该注册一个事件时间计时器或处理时间计时器(同样通过 TriggerContext
)。计时器将分别调用 Trigger.onEventTime()
或 Trigger.onProcessingTime()
。是否使用事件或处理时间取决于您的用例。
在 Fabian 和 Flink 邮件列表的帮助下,它开始工作了。
通过 TriggerContext
将状态存储在 ValueState
变量中。检查onEvent()
方法中的变量,如果是第一次,注册一个比当前时间多20秒的processingTimeTimer
并更新状态。在 onProcessingTime
方法中,注册另一个 ProcessingTimeTimer
比当前时间多 5 秒,更新状态并触发 Window
。
我想创建一个 Trigger
,第一次在 20 秒内触发,此后每 5 秒触发一次。我使用了 GlobalWindows
和自定义 Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
这是TradeTrigger
中的代码:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
所以基本上,当 flag
是 false
时,即第一次,Trigger
应该在 20 秒内触发并将 flag
设置为 true
.从下一次开始,它应该每 5 秒触发一次。
我面临的问题是,每次触发 Trigger
时,我在输出中只收到一条消息。也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息。
我期望每次触发时输出中有 20 条消息。
如果我使用 .timeWindow(Time.seconds(5))
并创建一个五秒的时间 window,我每 5 秒会在输出中收到 20 条消息。
请帮我把这段代码弄对。有什么我想念的吗?
您的触发器实施存在一些问题:
永远不要将函数的状态存储在静态变量中。 Flink 不隔离 JVM 中的用户进程。相反,它为每个 TaskManager 使用一个 JVM 并启动多个线程。因此,您的静态布尔标志在多个触发器实例之间共享。您应该存储可从
TriggerContext
访问的 Flink 的ValueState
接口标志。 Flink 会注意检查你的状态并在发生故障时恢复它。Trigger.onEvent()
仅在新事件到达时调用。因此它不能用于在特定时间触发 Window 计算。相反,您应该注册一个事件时间计时器或处理时间计时器(同样通过TriggerContext
)。计时器将分别调用Trigger.onEventTime()
或Trigger.onProcessingTime()
。是否使用事件或处理时间取决于您的用例。
在 Fabian 和 Flink 邮件列表的帮助下,它开始工作了。
通过 TriggerContext
将状态存储在 ValueState
变量中。检查onEvent()
方法中的变量,如果是第一次,注册一个比当前时间多20秒的processingTimeTimer
并更新状态。在 onProcessingTime
方法中,注册另一个 ProcessingTimeTimer
比当前时间多 5 秒,更新状态并触发 Window
。