Scala 中单一模式的 Flink CEP 迭代条件
Flink CEP iterative condition for single pattern in Scala
我面临的问题是我无法在 Scala 中对单个 CEP 模式执行求和。我想检测特定客户 ID 的总和何时大于 6100。我正在向 CEP.pattern(...) 提供键控流。我在下面提供了用于模式构建的代码。
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
我的输入是 avro 格式,Flink 从 kafka 使用它。输入看起来像这样 -:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
但是,下面的代码在使用两种模式时运行良好-:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
getEventsForPattern
returns 值已与模式匹配。让我们来分析一下客户 27
。处理事件时
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
您的第一个代码段拒绝了此邮件,因为它不满足条件:sum + amount = 0 + 6094 < 6100
。处理时
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}
您的条件将再次检查是否 0 + 547 > 6100
,这就是您看不到任何输出的原因。
在您的第二个示例中,您使用的是 followedBy
运算符,这意味着您将处理成对的元素。第一笔交易被无条件接受(因为您不包括 where
运算符),现在它将通过 ctx.getEventsForPattern("start")
调用返回。我希望您了解这段代码的行为。
CEP
主要用于发现数据中的模式,而不是聚合它们。您的问题可以通过先开窗再过滤来解决 - 无需在此处使用 CEP
。
我面临的问题是我无法在 Scala 中对单个 CEP 模式执行求和。我想检测特定客户 ID 的总和何时大于 6100。我正在向 CEP.pattern(...) 提供键控流。我在下面提供了用于模式构建的代码。
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
我的输入是 avro 格式,Flink 从 kafka 使用它。输入看起来像这样 -:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
但是,下面的代码在使用两种模式时运行良好-:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
getEventsForPattern
returns 值已与模式匹配。让我们来分析一下客户 27
。处理事件时
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
您的第一个代码段拒绝了此邮件,因为它不满足条件:sum + amount = 0 + 6094 < 6100
。处理时
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}
您的条件将再次检查是否 0 + 547 > 6100
,这就是您看不到任何输出的原因。
在您的第二个示例中,您使用的是 followedBy
运算符,这意味着您将处理成对的元素。第一笔交易被无条件接受(因为您不包括 where
运算符),现在它将通过 ctx.getEventsForPattern("start")
调用返回。我希望您了解这段代码的行为。
CEP
主要用于发现数据中的模式,而不是聚合它们。您的问题可以通过先开窗再过滤来解决 - 无需在此处使用 CEP
。