Flink - timesOrMore 的行为

Flink - behaviour of timesOrMore

我想找到后续事件的模式

内部模式为:

  1. 键值相同 "sensorArea"。
  2. 键 "customerId" 具有不同的值。
  3. 彼此相距不到 5 秒。

而且这个模式需要

  1. 仅当前面发生 3 次或更多次时才发出 "alert"。

我写了一些东西,但我确定它不完整。

两个问题

  1. 当我处于 "next" 模式时,我需要访问以前的事件字段,我怎么能不使用 ctx 命令就可以做到这一点,因为它很重..

  2. 我的代码带来了奇怪的结果——这是我的输入

我的输出是

3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}

即使我想要的输出应该是所有前六个事件(用户 111/222 和传感器是 33,然后是 44,然后是 55

Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
            .followedBy("second").where(new IterativeCondition<Customer>() {
                @Override
                public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
                    List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
                    int i = firstPatternEvents.size();
                    int currSensorData = currCustomerEvent.getSensorData();
                    int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
                    int currCustomerId = currCustomerEvent.getCustomerId();
                    int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
                    return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
                }
            })
            .within(Time.seconds(5))
            .timesOrMore(3);



    PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
    DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);

如果你先通过sensorArea键控流,你会更容易。他们将在所有事件都针对单个 sensorArea 的流上进行模式匹配,这将使模式更易于表达,并且匹配效率更高。

您无法避免使用迭代条件和 ctx,但在键入流后它应该更便宜。

另外,您的代码示例与文字描述不符。文本显示 "within 5 seconds" 和“3 次或更多次”,而代码显示 within(Time.seconds(2))timesOrMore(2).