为什么 AssignerWithPunctuatedWatermarks 在我的数据流中不起作用?
Why AssignerWithPunctuatedWatermarks does not work in my data stream?
我有一些问题。
我在程序中间使用 .assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks(60000)) (在一些过滤器、地图和其他 apache flink 运算符之后)。
我有简单的模式:
begin("start", AfterMatchSkipStrategy.skipToLast("end"))
.where(new SimpleConditionA())
.followedBy("end")
.where(new SimpleConditionB())
.within(Times.minutes(5));
我读了一条来自 Apache kafka 的消息(下面是事件时间戳)
消息 A:A.timestamp = 11:50:00
消息B:B.timestamp = 11:51:00
public class MyAssignerWithPunctuatedWatermark implements AssignerWithPunctuatedWatermark{
private long maxOutOfOrderness;
private long currentMaxTimestamp;
/*there are a Constructor*/
public long extractTimestamp (Event e, long l){
long timestamp = e.timestamp;
if(timestamp > currentMaxTimestamp){
currentMaxTimestamp = timestamp;
}
return timestamp;
}
public Watermark checkAndGetNextWatermark(Event e, long l){
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
然后我在 Apache Kafka 中放入了一条消息 A - 没问题!
然后我在 Apache Kafka 中放入了一条消息 B - 并且没有组装模式。 (我在 SimpleCondtionB 中使用日志记录,消息 B 不在 SimpleCondtion 中)。为什么????
当我使用 .assignTimestampsAndWatermarks(new IngestionTimeExtractor()) 时。一切都很好。但是我需要参加一个迟到(迟到不超过1分钟)的活动。
有必要 assignTimestampsAndWatermarks
在 任何依赖于时间的操作之前 完成。 CEP 依赖 assignTimestampsAndWatermarks
提供的时间信息来对输入流进行排序,并处理 within(duration)
约束。
为了将模式与可能的 out-of-order 事件时间流正确匹配,CEP 必须首先按时间戳对传入事件进行排序。作为其中的一部分,每个传入事件都保存在缓冲区中,直到当前水印超过其时间戳——因为到那时,更早的事件可能仍会到达(不被视为 late ).时间戳在当前水印之前的延迟事件要么被 CEP 丢弃,要么被发送到侧输出(如果配置了一个)。
因此,为了让消息 B 被您的模式处理,您必须首先注入一个时间戳足够晚于 11:51:00(消息 B 的时间)的事件以满足 maxOutOfOrderness
你在水印生成器中使用过。
对于 IngestionTimeExtractor
事件不能 out-of-order,因此不需要此排序步骤,可以立即处理消息 B。
我有一些问题。
我在程序中间使用 .assignTimestampsAndWatermarks(new MyAssignerWithPunctuatedWatermarks(60000)) (在一些过滤器、地图和其他 apache flink 运算符之后)。
我有简单的模式:
begin("start", AfterMatchSkipStrategy.skipToLast("end"))
.where(new SimpleConditionA())
.followedBy("end")
.where(new SimpleConditionB())
.within(Times.minutes(5));
我读了一条来自 Apache kafka 的消息(下面是事件时间戳)
消息 A:A.timestamp = 11:50:00
消息B:B.timestamp = 11:51:00
public class MyAssignerWithPunctuatedWatermark implements AssignerWithPunctuatedWatermark{
private long maxOutOfOrderness;
private long currentMaxTimestamp;
/*there are a Constructor*/
public long extractTimestamp (Event e, long l){
long timestamp = e.timestamp;
if(timestamp > currentMaxTimestamp){
currentMaxTimestamp = timestamp;
}
return timestamp;
}
public Watermark checkAndGetNextWatermark(Event e, long l){
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
然后我在 Apache Kafka 中放入了一条消息 A - 没问题!
然后我在 Apache Kafka 中放入了一条消息 B - 并且没有组装模式。 (我在 SimpleCondtionB 中使用日志记录,消息 B 不在 SimpleCondtion 中)。为什么????
当我使用 .assignTimestampsAndWatermarks(new IngestionTimeExtractor()) 时。一切都很好。但是我需要参加一个迟到(迟到不超过1分钟)的活动。
有必要 assignTimestampsAndWatermarks
在 任何依赖于时间的操作之前 完成。 CEP 依赖 assignTimestampsAndWatermarks
提供的时间信息来对输入流进行排序,并处理 within(duration)
约束。
为了将模式与可能的 out-of-order 事件时间流正确匹配,CEP 必须首先按时间戳对传入事件进行排序。作为其中的一部分,每个传入事件都保存在缓冲区中,直到当前水印超过其时间戳——因为到那时,更早的事件可能仍会到达(不被视为 late ).时间戳在当前水印之前的延迟事件要么被 CEP 丢弃,要么被发送到侧输出(如果配置了一个)。
因此,为了让消息 B 被您的模式处理,您必须首先注入一个时间戳足够晚于 11:51:00(消息 B 的时间)的事件以满足 maxOutOfOrderness
你在水印生成器中使用过。
对于 IngestionTimeExtractor
事件不能 out-of-order,因此不需要此排序步骤,可以立即处理消息 B。