无法使用 Flink CEP 在 KeyedStream 中按键应用模式
Can't apply pattern by key in KeyedStream with Flink CEP
我正在尝试对每 2 次具有相同 ID 的事件发出警报。为此,我使用 keyBy()
根据 ID 应用我的操作。
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{TimeCharacteristic, scala}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.cep.CEP
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
class Pipeline {
//{...}
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//{...}
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input").name("Stream original")
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream : DataStream[(String,Double,Double,String,Int,Int)] = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
newTupleStream = newTupleStream.process(new RemoveLateDataProcessFunction).keyBy(new TupleKeySelector())
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new EventoTestePatternProcessFunction())
result.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE)
env.execute()
}
这是我的 TupleKeySelector
:
import org.apache.flink.api.java.functions.KeySelector
class TupleKeySelector() extends KeySelector[(String,Double,Double,String,Int,Int),Int]{
override def getKey(value: (String, Double, Double, String, Int, Int)): Int = {
value._6
}
}
这是我的 PlacasPunctualTimestampAssigner
:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class PlacasPunctualTimestampAssigner extends AssignerWithPunctuatedWatermarks[(String,Double,Double,String,Int,Int)]{
var counter : Int = 0
var eventsUntilNextWatermark : Int = 0
var lastTimestamp : Long = _
override def checkAndGetNextWatermark(lastElement: (String, Double, Double, String, Int, Int), extractedTimestamp: Long): Watermark = {
if(counter == eventsUntilNextWatermark){
counter = 0
var time = new Timestamp(lastTimestamp)
println("Watermark: ",time.toString)
new Watermark(lastTimestamp)
}else{
null
}
}
override def extractTimestamp(element: (String,Double, Double,String,Int,Int), previousElementTimestamp: Long): Long = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = formatter.parse(element._4)
val timestampAtual = new Timestamp(date.getTime).getTime
lastTimestamp = Math.max(lastTimestamp,timestampAtual)
//counter = counter + 1
timestampAtual
}
}
这是我的 EventoTesteConditionFunction
:
import org.apache.flink.cep.pattern.conditions.IterativeCondition
class EventoTesteConditionFunction(var counter : Int) extends IterativeCondition[(String,Double,Double,String,Int,Int)] {
private var c : Int = 0
override def filter(value: (String, Double, Double, String, Int, Int), ctx: IterativeCondition.Context[(String, Double, Double, String, Int, Int)]): Boolean = {
if(c == counter-1){
println("Event: "+value._6.toString)
c = 0
true
}else{
c = c + 1
false
}
}
}
根据 Flink 采用的语义,我应该通过在我的模式中使用 times(1)
量词来实现我的目标,甚至省略它。然而,这不是我在输出中获得的。这是我的测试用例:
//Input events:
(name,timestamp,id)
(a,2013-02-08 00:14:00,1)
(b,2013-02-08 00:15:00,1) // Event should be emitted here
(c,2013-02-08 00:15:10,1)
(d,2013-02-08 00:17:15,1) // Event should be emitted here
(e,2013-02-08 00:18:20,1)
(f,2013-02-08 00:19:30,0)
(g,2013-02-08 00:20:35.507,2)
(h,2013-02-08 00:21:39.906,2) // Event should be emitted here
(i,2013-02-08 00:22:03.499,2)
(j,2013-02-08 00:23:23.818,2) // Event should be emitted here
(k,2013-02-08 00:24:02.319,2)
(l,2013-02-08 00:25:30.971,0)
//Expected output(I'm returning only the id value):
1
1
2
2
//Real output:
1
1
0
2
2
0
此测试表明模式不是按键应用的,而是将所有事件视为属于同一组,这是非键控 DataStream 的行为。
如果我将量词设置为 times(2)
,这就是我从相同输入中得到的结果:
//Expected output:
1
2
//Real output:
0
0
我的 EventoTesteConditionFunction
也收到了奇怪的照片。似乎某些事件被检查了不止一次而其他事件被跳过了。
//println from accepted events ofEventoTesteConditionFunction for times(2) quantifier
Event: (b,1)
Event: (c,1)
Event: (d,1)
Event: (e,1)
Event: (f,0)
Event: (h,2)
Event: (i,2)
Event: (j,2)
Event: (k,2)
Event: (l,0)
Event: (l,0)
考虑到这一切,我的问题是:
- 如何按键应用图案?
- 为什么 Flink 会出现如此奇怪的行为?
您正在使用的模式:
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)
表示您想要发出与您的 EventoTesteConditionFunction 匹配的每条记录。对于您的用例,您应该使用模式:
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new AlwaysTrue()).times(2)
其中 AlwaysTrue
returns 所有记录都为真。
您看到奇怪行为的原因是您在 EventoTesteConditionFunction
中使用变量 c
的方式。此变量的范围不限于键。 Flink 将对单个任务使用 EventoTesteConditionFunction
的单个实例,它处理键的子组而不是单个键。然而,计算结果的范围仅限于密钥。由函数结果确定的内部状态也作用于键。
换个说法。对于这样的用例,我宁愿推荐研究 ProcessFunction。在文档中,您还可以看到一个类似的示例,您正在尝试这样做,但是使用 Flink 的状态,它将被检查点并限定为一个键。
我正在尝试对每 2 次具有相同 ID 的事件发出警报。为此,我使用 keyBy()
根据 ID 应用我的操作。
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.{TimeCharacteristic, scala}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.cep.CEP
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.datastream.{DataStream, KeyedStream}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
class Pipeline {
//{...}
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//{...}
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input").name("Stream original")
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream : DataStream[(String,Double,Double,String,Int,Int)] = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
newTupleStream = newTupleStream.process(new RemoveLateDataProcessFunction).keyBy(new TupleKeySelector())
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new EventoTestePatternProcessFunction())
result.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE)
env.execute()
}
这是我的 TupleKeySelector
:
import org.apache.flink.api.java.functions.KeySelector
class TupleKeySelector() extends KeySelector[(String,Double,Double,String,Int,Int),Int]{
override def getKey(value: (String, Double, Double, String, Int, Int)): Int = {
value._6
}
}
这是我的 PlacasPunctualTimestampAssigner
:
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class PlacasPunctualTimestampAssigner extends AssignerWithPunctuatedWatermarks[(String,Double,Double,String,Int,Int)]{
var counter : Int = 0
var eventsUntilNextWatermark : Int = 0
var lastTimestamp : Long = _
override def checkAndGetNextWatermark(lastElement: (String, Double, Double, String, Int, Int), extractedTimestamp: Long): Watermark = {
if(counter == eventsUntilNextWatermark){
counter = 0
var time = new Timestamp(lastTimestamp)
println("Watermark: ",time.toString)
new Watermark(lastTimestamp)
}else{
null
}
}
override def extractTimestamp(element: (String,Double, Double,String,Int,Int), previousElementTimestamp: Long): Long = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = formatter.parse(element._4)
val timestampAtual = new Timestamp(date.getTime).getTime
lastTimestamp = Math.max(lastTimestamp,timestampAtual)
//counter = counter + 1
timestampAtual
}
}
这是我的 EventoTesteConditionFunction
:
import org.apache.flink.cep.pattern.conditions.IterativeCondition
class EventoTesteConditionFunction(var counter : Int) extends IterativeCondition[(String,Double,Double,String,Int,Int)] {
private var c : Int = 0
override def filter(value: (String, Double, Double, String, Int, Int), ctx: IterativeCondition.Context[(String, Double, Double, String, Int, Int)]): Boolean = {
if(c == counter-1){
println("Event: "+value._6.toString)
c = 0
true
}else{
c = c + 1
false
}
}
}
根据 Flink 采用的语义,我应该通过在我的模式中使用 times(1)
量词来实现我的目标,甚至省略它。然而,这不是我在输出中获得的。这是我的测试用例:
//Input events:
(name,timestamp,id)
(a,2013-02-08 00:14:00,1)
(b,2013-02-08 00:15:00,1) // Event should be emitted here
(c,2013-02-08 00:15:10,1)
(d,2013-02-08 00:17:15,1) // Event should be emitted here
(e,2013-02-08 00:18:20,1)
(f,2013-02-08 00:19:30,0)
(g,2013-02-08 00:20:35.507,2)
(h,2013-02-08 00:21:39.906,2) // Event should be emitted here
(i,2013-02-08 00:22:03.499,2)
(j,2013-02-08 00:23:23.818,2) // Event should be emitted here
(k,2013-02-08 00:24:02.319,2)
(l,2013-02-08 00:25:30.971,0)
//Expected output(I'm returning only the id value):
1
1
2
2
//Real output:
1
1
0
2
2
0
此测试表明模式不是按键应用的,而是将所有事件视为属于同一组,这是非键控 DataStream 的行为。
如果我将量词设置为 times(2)
,这就是我从相同输入中得到的结果:
//Expected output:
1
2
//Real output:
0
0
我的 EventoTesteConditionFunction
也收到了奇怪的照片。似乎某些事件被检查了不止一次而其他事件被跳过了。
//println from accepted events ofEventoTesteConditionFunction for times(2) quantifier
Event: (b,1)
Event: (c,1)
Event: (d,1)
Event: (e,1)
Event: (f,0)
Event: (h,2)
Event: (i,2)
Event: (j,2)
Event: (k,2)
Event: (l,0)
Event: (l,0)
考虑到这一切,我的问题是:
- 如何按键应用图案?
- 为什么 Flink 会出现如此奇怪的行为?
您正在使用的模式:
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new EventoTesteConditionFunction(2)).times(1)
表示您想要发出与您的 EventoTesteConditionFunction 匹配的每条记录。对于您的用例,您应该使用模式:
Pattern.begin[(String,Double,Double,String,Int,Int)]("teste").where(new AlwaysTrue()).times(2)
其中 AlwaysTrue
returns 所有记录都为真。
您看到奇怪行为的原因是您在 EventoTesteConditionFunction
中使用变量 c
的方式。此变量的范围不限于键。 Flink 将对单个任务使用 EventoTesteConditionFunction
的单个实例,它处理键的子组而不是单个键。然而,计算结果的范围仅限于密钥。由函数结果确定的内部状态也作用于键。
换个说法。对于这样的用例,我宁愿推荐研究 ProcessFunction。在文档中,您还可以看到一个类似的示例,您正在尝试这样做,但是使用 Flink 的状态,它将被检查点并限定为一个键。