Flink CEP 事件未触发
Flink CEP Event Not triggering
我已经在 Flink 中实现了 CEP 模式,它按预期工作连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我正在使用 AscendingTimestampExtractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});
而且我还收到警告消息,
AscendingTimestampExtractor:140 - Timestamp monotony violated: 1594017872227 < 1594017873133
我还尝试使用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks none 其中一个正在工作
我附上了未分配 Watermark 的 Flink 控制台屏幕截图。
Updated flink console screenshot
有人能帮忙吗?
CEP 必须首先根据水印对输入流进行排序。所以
问题可能出在水印上,但您没有向我们展示足够的信息来调试原因。一个常见问题是 idle source,这会阻止水印前进。
但还有其他可能的原因。为了调试这种情况,我建议你查看一些指标,要么在 Flink Web UI 中,要么在一个指标系统中,如果你有一个连接的话。首先,通过在管道的不同阶段查看 numRecordsIn
、numRecordsOut
或 numRecordsInPerSecond
和 numRecordsOutPerSecond
来检查记录是否在流动。
如果有活动,则查看 currentOutputWatermark
贯穿您工作的不同任务,看看活动时间是否提前。
更新:
看来您可能在 Kafka 消费者上调用 assignTimestampsAndWatermarks
,这将导致每个分区加水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,并且会阻止整个水印。尝试在源生成的 DataStream 上调用 assignTimestampsAndWatermarks
,看看是否能解决问题。 (当然,如果没有按分区加水印,您将无法使用 AscendingTimestampExtractor,因为流不会按顺序排列。)
我已经在 Flink 中实现了 CEP 模式,它按预期工作连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我正在使用 AscendingTimestampExtractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});
而且我还收到警告消息,
AscendingTimestampExtractor:140 - Timestamp monotony violated: 1594017872227 < 1594017873133
我还尝试使用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks none 其中一个正在工作
我附上了未分配 Watermark 的 Flink 控制台屏幕截图。 Updated flink console screenshot
有人能帮忙吗?
CEP 必须首先根据水印对输入流进行排序。所以 问题可能出在水印上,但您没有向我们展示足够的信息来调试原因。一个常见问题是 idle source,这会阻止水印前进。
但还有其他可能的原因。为了调试这种情况,我建议你查看一些指标,要么在 Flink Web UI 中,要么在一个指标系统中,如果你有一个连接的话。首先,通过在管道的不同阶段查看 numRecordsIn
、numRecordsOut
或 numRecordsInPerSecond
和 numRecordsOutPerSecond
来检查记录是否在流动。
如果有活动,则查看 currentOutputWatermark
贯穿您工作的不同任务,看看活动时间是否提前。
更新:
看来您可能在 Kafka 消费者上调用 assignTimestampsAndWatermarks
,这将导致每个分区加水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,并且会阻止整个水印。尝试在源生成的 DataStream 上调用 assignTimestampsAndWatermarks
,看看是否能解决问题。 (当然,如果没有按分区加水印,您将无法使用 AscendingTimestampExtractor,因为流不会按顺序排列。)