Flink CEP 事件未触发

Flink CEP Event Not triggering

我已经在 Flink 中实现了 CEP 模式,它按预期工作连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我正在使用 AscendingTimestampExtractor,

consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

而且我还收到警告消息,

AscendingTimestampExtractor:140 - Timestamp monotony violated: 1594017872227 < 1594017873133

我还尝试使用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks none 其中一个正在工作

我附上了未分配 Watermark 的 Flink 控制台屏幕截图。 Updated flink console screenshot

有人能帮忙吗?

CEP 必须首先根据水印对输入流进行排序。所以 问题可能出在水印上,但您没有向我们展示足够的信息来调试原因。一个常见问题是 idle source,这会阻止水印前进。

但还有其他可能的原因。为了调试这种情况,我建议你查看一些指标,要么在 Flink Web UI 中,要么在一个指标系统中,如果你有一个连接的话。首先,通过在管道的不同阶段查看 numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond 来检查记录是否在流动。

如果有活动,则查看 currentOutputWatermark 贯穿您工作的不同任务,看看活动时间是否提前。

更新:

看来您可能在 Kafka 消费者上调用 assignTimestampsAndWatermarks,这将导致每个分区加水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,并且会阻止整个水印。尝试在源生成的 DataStream 上调用 assignTimestampsAndWatermarks,看看是否能解决问题。 (当然,如果没有按分区加水印,您将无法使用 AscendingTimestampExtractor,因为流不会按顺序排列。)