Flink CEP Pattern 与开始作业后的第一个事件不匹配,并且始终与之前的事件集匹配

Flink CEP Pattern does not match for first events after starting job and always matches previous events set

我想在 Flink 1.4.0 Streaming 中匹配一个 CEP 模式,代码如下:

    DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());

    DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
    KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());

    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new ActionCondition("action1"))
    .followedBy("middle").where(new ActionCondition("action2"))
    .followedBy("end").where(new ActionCondition("action3"));

    pattern = pattern.within(Time.seconds(30));

    PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

Event 只是一个 POJO

public class Event {
    private UUID id;
    private String action;
    private String senderID;
    private long occurrenceTimeStamp;
    ......
}

从我的自定义来源 (Google PubSub) 中提取。 第一个过滤器 FilterEmptyAndInvalidEvents() 仅过滤格式不正确等事件,但在本例中不会发生这种情况。由于日志输出,我可以验证这一点。 所以每个事件都通过MyKeySelector.getKey()方法运行。

BoundedOutOfOrdneressGenerator 仅从一个字段中提取时间戳:

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
    private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
    private final long maxOutOfOrderness = 5500; // 5.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(Event element, long previousElementTimestamp) {
        long timestamp = element.getOccurrenceTimeStamp();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        return newWatermark;
    }
}

MyKeySelector 只是从字段中提取一个字符串值:

public class MyKeySelector implements KeySelector<Event, String> {
    private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);

    @Override
    public String getKey(Event value) throws Exception {
        String senderID = value.getSenderID();
        LOG.info("Partioning event {} by key {}", value, senderID);
        return senderID;
    }
}

ActionCondition 在这里只是对事件中的一个字段进行比较,看起来像这样:

public class ActionCondition extends SimpleCondition<Event> {
    private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);

    private String filterForCommand = "";

    public ActionCondition(String filterForCommand) {
        this.filterForCommand = filterForCommand;
    }

    @Override
    public boolean filter(Event value) throws Exception {
        LOG.info("Filtering event for {} action: {}", filterForCommand, value);

        if (value == null) {
            return false;
        }

        if (value.getAction() == null) {
            return false;
        }

        if (value.getAction().equals(filterForCommand)) {
            LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
            return true;
        } else {
            LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
            return false;
        }
    }
}

不幸的是,当启动作业并发送应与模式匹配的事件时,它们会被正确接收和分区,但 CEP 模式不匹配。

例如,我发送以下事件:

  1. 动作 1
  2. 动作 2
  3. 动作 3

在 Flink 作业的日志输出中,我看到事件通过 MyKeySelector.getKey() 方法是正确的 运行,因为我在那里添加了日志输出。 所以事件似乎正确地出现在流中,但不幸的是它们与模式不匹配。

日志输出如下所示:

FilterEmptyAndInvalidEvents   - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents   - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector  - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA

TimeCharacteristic 通过

设置为 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

并且事件包含正确的时间戳。

如果我现在发送另外 3 个带有操作的事件(但带有新的时间戳等)

  1. 动作 1
  2. 动作 2
  3. 动作 3

模式与 第一 组事件相匹配。 我知道它与第一组事件匹配,因为出于调试目的,我用 guid 标记了每个事件,并为匹配的事件打印了它。

在发送第 3、4、... 组这 3 个事件时,总是会匹配前一组事件。 所以在模式检测中似乎有一种 "offset" 。不过,这似乎不是时间问题,因为如果我在发送后等待很长时间(并看到事件被 Flink 分区),第一组事件也不匹配。

我的代码有什么问题吗?为什么 flink 总是只匹配之前的一组事件与模式?

我确实解决了 - 我一直在搜索流媒体源,但我的事件处理实际上完全没问题。问题是,我的 Watermark 生成并没有连续发生。 正如您在上面的代码中看到的,我只在收到事件时才生成水印

但是在发送前 3 个事件后,在我的设置中 之后没有更多的事件。因此,从未再次生成新的水印。

并且因为没有创建时间戳大于序列中最后一个接收事件的时间戳的新 Watermark,所以 Flink 从未处理过这些元素。可以在这里找到原因:Flink CEP - Handling Lateness in Event Time

重要的一句话是:

...and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed.

因此,由于我在 BoundedOutOfOrdernessGenerator 中以 5.5 秒的延迟生成水印,因此最新的水印始终比最后一个事件的时间戳早 5.5 秒。因此,事件从未被处理过。

因此,解决此问题的一种方法是定期生成水印,这些水印假定事件进入的特定延迟。为此,我们需要为 ExecutionConfig 设置 setAutoWatermarkInterval

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
..
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(1000L);

这使 Flink 能够在给定时间(在本例中为每秒)定期调用 Watermark 生成器并拉取新的 Watermark。

此外,我们需要调整 Timestamp/Watermark 生成器,使其在没有新事件流入的情况下发出新的时间戳。为此,我操纵了 Flink 附带的 BoundedOutOfOrdernessTimestampExtractor.java

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessGenerator() {
        Time maxOutOfOrderness = Time.seconds(5);

        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness
                    + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    /**
     * Extracts the timestamp from the given element.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public long extractTimestamp(Event element) {
        long timestamp = element.getOccurrenceTimeStamp();
        return timestamp;
    }

    @Override
    public final Watermark getCurrentWatermark() {
        Instant instant = Instant.now();
        long nowTimestampMillis = instant.toEpochMilli();
        long latenessTimestamp = nowTimestampMillis - maxOutOfOrderness;

        if (latenessTimestamp >= currentMaxTimestamp) {
            currentMaxTimestamp = latenessTimestamp;
        }

        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(Event element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

正如您在 getCurrentWatermark() 中看到的那样,我采用当前纪元时间戳,减去我们预期的最​​大延迟,然后根据该时间戳创建水印。

现在,Flink 每秒都会提取一个新的时间戳,而 Watermark 总是 "lags" 落后 5 秒。这允许在收到最后一个事件后最多 5 秒内将事件与定义的模式进行匹配。

这是否适用于您的方案取决于您的方案,因为这也意味着在 Flink 接收到的时间点早于 5 秒(比 Watermark 少 5 秒)的事件将被丢弃并且不被处理没有了。