Flink CEP 不在事件时间工作但在处理时间工作
Flink CEP not Working in event time but working in Processing Time
当我使用 Flink CEP 代码处理时间(默认配置)时,我能够获得所需的模式匹配,但是在将环境配置为事件时间时,我无法获得任何模式匹配。
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(3000) // checkpoint every 3000 msec
val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))
val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here
val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
val patternStream = CEP.pattern(event, pattern1)
val warnID = patternStream.sideOutputLateData(latedata).select(value => {
val v = value.mapValues(c => c.toList.toString)
Json(DefaultFormats).write(v).replace("\\"", "\"")
//.replace("List(","{").replace(")","}")
})
val latedatastream = warnID.getSideOutput(latedata)
latedatastream.print("late_data")
warnID.print("warning")
event.print("event")
时间戳提取码
object ExtractAndAssignEventTime {
def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
if(!(timeFormat.equalsIgnoreCase("Unix"))){
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
override def extractTimestamp(t: GenericRecord): Long = {
new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
}
})
EventTimeStream
}
else{
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
override def extractTimestamp(t: GenericRecord): Long = {
(t.get(timeColumn).toString.toLong)
}
})
EventTimeStream
}
}
请帮我解决这个问题。提前致谢!
既然你使用的是AssingerWithPeriodicWatermark
,你还需要设置setAutowatermarkInterval
,这样Flink才会使用这个区间来生成水印。
您可以通过调用 env.getConfig.setAutoWatermarkInterval([interval])
来完成此操作。
Event Time CEP是基于Watermarks的,所以如果不生成那么基本上就没有输出。
我遇到了同样的问题,我刚刚“解决”了它,但答案没有多大意义(至少对我而言),如您所见。
解释:
在我的原始代码中,我有这个:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
var tupleStream = stream.map(new S2TMapFunction())
tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
根据我的日志记录,我看到 SameRegionFunction
和 MyPatternProcessFunction
都没有被执行,至少可以说这是非常出乎意料的。
答案:
因为我一无所知,所以我决定测试让我的流再通过一个转换函数,只是为了检查我的事件是否真的被插入到流中。所以,我提交 tupleStream
到一个映射操作,生成 newTupleStream
,像这样:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
/* I created 'DoNothingMapFunction', where the output event = input event*/
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
然后SameRegionFunction
和MyPatternProcessFunction
决定运行。
观测:
我改了行:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
对此:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
而且它也有效。显然只是另一个级别的间接就足以让它工作,虽然我不清楚为什么会这样。
当我使用 Flink CEP 代码处理时间(默认配置)时,我能够获得所需的模式匹配,但是在将环境配置为事件时间时,我无法获得任何模式匹配。
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(3000) // checkpoint every 3000 msec
val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))
val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here
val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
val patternStream = CEP.pattern(event, pattern1)
val warnID = patternStream.sideOutputLateData(latedata).select(value => {
val v = value.mapValues(c => c.toList.toString)
Json(DefaultFormats).write(v).replace("\\"", "\"")
//.replace("List(","{").replace(")","}")
})
val latedatastream = warnID.getSideOutput(latedata)
latedatastream.print("late_data")
warnID.print("warning")
event.print("event")
时间戳提取码
object ExtractAndAssignEventTime {
def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
if(!(timeFormat.equalsIgnoreCase("Unix"))){
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
override def extractTimestamp(t: GenericRecord): Long = {
new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
}
})
EventTimeStream
}
else{
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
override def extractTimestamp(t: GenericRecord): Long = {
(t.get(timeColumn).toString.toLong)
}
})
EventTimeStream
}
}
请帮我解决这个问题。提前致谢!
既然你使用的是AssingerWithPeriodicWatermark
,你还需要设置setAutowatermarkInterval
,这样Flink才会使用这个区间来生成水印。
您可以通过调用 env.getConfig.setAutoWatermarkInterval([interval])
来完成此操作。
Event Time CEP是基于Watermarks的,所以如果不生成那么基本上就没有输出。
我遇到了同样的问题,我刚刚“解决”了它,但答案没有多大意义(至少对我而言),如您所见。
解释:
在我的原始代码中,我有这个:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
var tupleStream = stream.map(new S2TMapFunction())
tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
根据我的日志记录,我看到 SameRegionFunction
和 MyPatternProcessFunction
都没有被执行,至少可以说这是非常出乎意料的。
答案:
因为我一无所知,所以我决定测试让我的流再通过一个转换函数,只是为了检查我的事件是否真的被插入到流中。所以,我提交 tupleStream
到一个映射操作,生成 newTupleStream
,像这样:
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
env.getConfig.setAutoWatermarkInterval(1)
...
var stream : DataStream[String] = env.readTextFile("/home/luca/Desktop/input")
/* I created 'DoNothingMapFunction', where the output event = input event*/
var tupleStream = stream.map(new S2TMapFunction())
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
val pattern = Pattern.begin[(String,Double,Double,String,Int,Int)]("follow").where(new SameRegionFunction())
val patternStream = CEP.pattern(newTupleStream,pattern)
val result = patternStream.process(new MyPatternProcessFunction())
然后SameRegionFunction
和MyPatternProcessFunction
决定运行。
观测:
我改了行:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner()).map(new DoNothingMapFunction())
对此:
var newTupleStream = tupleStream.assignTimestampsAndWatermarks(new PlacasPunctualTimestampAssigner())
而且它也有效。显然只是另一个级别的间接就足以让它工作,虽然我不清楚为什么会这样。