Scala 中单一模式的 Flink CEP 迭代条件

Flink CEP iterative condition for single pattern in Scala

我面临的问题是我无法在 Scala 中对单个 CEP 模式执行求和。我想检测特定客户 ID 的总和何时大于 6100。我正在向 CEP.pattern(...) 提供键控流。我在下面提供了用于模式构建的代码。

val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

我的输入是 avro 格式,Flink 从 kafka 使用它。输入看起来像这样 -:

{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```

但是,下面的代码在使用两种模式时运行良好-:

val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

getEventsForPattern returns 值已与模式匹配。让我们来分析一下客户 27。处理事件时

{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}

您的第一个代码段拒绝了此邮件,因为它不满足条件:sum + amount = 0 + 6094 < 6100。处理时

{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}

您的条件将再次检查是否 0 + 547 > 6100,这就是您看不到任何输出的原因。

在您的第二个示例中,您使用的是 followedBy 运算符,这意味着您将处理成对的元素。第一笔交易被无条件接受(因为您不包括 where 运算符),现在它将通过 ctx.getEventsForPattern("start") 调用返回。我希望您了解这段代码的行为。


CEP 主要用于发现数据中的模式,而不是聚合它们。您的问题可以通过先开窗再过滤来解决 - 无需在此处使用 CEP