Flink - timesOrMore 的行为
Flink - behaviour of timesOrMore
我想找到后续事件的模式
内部模式为:
- 键值相同 "sensorArea"。
- 键 "customerId" 具有不同的值。
- 彼此相距不到 5 秒。
而且这个模式需要
- 仅当前面发生 3 次或更多次时才发出 "alert"。
我写了一些东西,但我确定它不完整。
两个问题
当我处于 "next" 模式时,我需要访问以前的事件字段,我怎么能不使用 ctx 命令就可以做到这一点,因为它很重..
我的代码带来了奇怪的结果——这是我的输入
我的输出是
3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}
即使我想要的输出应该是所有前六个事件(用户 111/222 和传感器是 33,然后是 44,然后是 55
Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
.followedBy("second").where(new IterativeCondition<Customer>() {
@Override
public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
int i = firstPatternEvents.size();
int currSensorData = currCustomerEvent.getSensorData();
int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
int currCustomerId = currCustomerEvent.getCustomerId();
int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
}
})
.within(Time.seconds(5))
.timesOrMore(3);
PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);
如果你先通过sensorArea键控流,你会更容易。他们将在所有事件都针对单个 sensorArea 的流上进行模式匹配,这将使模式更易于表达,并且匹配效率更高。
您无法避免使用迭代条件和 ctx,但在键入流后它应该更便宜。
另外,您的代码示例与文字描述不符。文本显示 "within 5 seconds" 和“3 次或更多次”,而代码显示 within(Time.seconds(2))
和 timesOrMore(2)
.
我想找到后续事件的模式
内部模式为:
- 键值相同 "sensorArea"。
- 键 "customerId" 具有不同的值。
- 彼此相距不到 5 秒。
而且这个模式需要
- 仅当前面发生 3 次或更多次时才发出 "alert"。
我写了一些东西,但我确定它不完整。
两个问题
当我处于 "next" 模式时,我需要访问以前的事件字段,我怎么能不使用 ctx 命令就可以做到这一点,因为它很重..
我的代码带来了奇怪的结果——这是我的输入
我的输出是
3> {first=[Customer[timestamp=50,customerId=111,toAdd=2,sensorData=33]], second=[Customer[timestamp=100,customerId=222,toAdd=2,sensorData=33], Customer[timestamp=600,customerId=333,toAdd=2,sensorData=33]]}
即使我想要的输出应该是所有前六个事件(用户 111/222 和传感器是 33,然后是 44,然后是 55
Pattern<Customer, ?> sameUserDifferentSensor = Pattern.<Customer>begin("first", skipStrategy)
.followedBy("second").where(new IterativeCondition<Customer>() {
@Override
public boolean filter(Customer currCustomerEvent, Context<Customer> ctx) throws Exception {
List<Customer> firstPatternEvents = Lists.newArrayList(ctx.getEventsForPattern("first"));
int i = firstPatternEvents.size();
int currSensorData = currCustomerEvent.getSensorData();
int prevSensorData = firstPatternEvents.get(i-1).getSensorData();
int currCustomerId = currCustomerEvent.getCustomerId();
int prevCustomerId = firstPatternEvents.get(i-1).getCustomerId();
return currSensorData==prevSensorData && currCustomerId!=prevCustomerId;
}
})
.within(Time.seconds(5))
.timesOrMore(3);
PatternStream<Customer> sameUserDifferentSensorPatternStream = CEP.pattern(customerStream, sameUserDifferentSensor);
DataStream<String> alerts1 = sameUserDifferentSensorPatternStream.select((PatternSelectFunction<Customer, String>) Object::toString);
如果你先通过sensorArea键控流,你会更容易。他们将在所有事件都针对单个 sensorArea 的流上进行模式匹配,这将使模式更易于表达,并且匹配效率更高。
您无法避免使用迭代条件和 ctx,但在键入流后它应该更便宜。
另外,您的代码示例与文字描述不符。文本显示 "within 5 seconds" 和“3 次或更多次”,而代码显示 within(Time.seconds(2))
和 timesOrMore(2)
.