Flink CEP PojoSerializer 错误的多态解析
Flink CEP PojoSerializer wrong polymorphism resolution
目前,当我打印 CEP 模式的结果时,我有一个非常奇怪的行为。
数据模型如下:
- 事件:(类型:字符串,timestamp:Long)
- VehicleRelated 扩展事件:(vehicleId:Integer)
- Position extends VehicleRelated: (pos:Integer, direction:Integer)
- Recognize extends VehicleRelated: (pos:Integer, id:Integer, direction:Integer)
CEP 部分如下所示:
val pattern = Pattern
.begin[VehicleRelated]("before")
.subtype(classOf[Position])
.next("recognize")
.subtype(classOf[Recognize])
.next("after")
.subtype(classOf[Position])
.within(Time.seconds(5))
val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
.select(pattern => {
val s = pattern("recognize").head.asInstanceOf[Recognize]
LOG.debug(s.toString)
s
})
recognitions.print("RECO")
日志输出如下:
14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())
现在最大的问题是,为什么在 return 转换对象后 vehicleId 属性为空?有什么建议吗?
Update 我做了一些调查,发现问题出在 PojoSerializer 上。复制函数被调用,第 151 行 this.numFields
是错误的。计数仅包括 Recognize class 本身的属性计数,但不包括继承的 classes,在此case Event 和 VehicleRelated.. 属性类型和时间戳也为空..
问题是 flink 内部 POJO 序列化程序无法正确解析多态性。
因此我将 Kyro 序列化程序设置为默认值:
val config = env.getConfig
config.enableForceKryo()
目前,当我打印 CEP 模式的结果时,我有一个非常奇怪的行为。
数据模型如下:
- 事件:(类型:字符串,timestamp:Long)
- VehicleRelated 扩展事件:(vehicleId:Integer)
- Position extends VehicleRelated: (pos:Integer, direction:Integer)
- Recognize extends VehicleRelated: (pos:Integer, id:Integer, direction:Integer)
CEP 部分如下所示:
val pattern = Pattern
.begin[VehicleRelated]("before")
.subtype(classOf[Position])
.next("recognize")
.subtype(classOf[Recognize])
.next("after")
.subtype(classOf[Position])
.within(Time.seconds(5))
val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
.select(pattern => {
val s = pattern("recognize").head.asInstanceOf[Recognize]
LOG.debug(s.toString)
s
})
recognitions.print("RECO")
日志输出如下:
14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())
现在最大的问题是,为什么在 return 转换对象后 vehicleId 属性为空?有什么建议吗?
Update 我做了一些调查,发现问题出在 PojoSerializer 上。复制函数被调用,第 151 行 this.numFields
是错误的。计数仅包括 Recognize class 本身的属性计数,但不包括继承的 classes,在此case Event 和 VehicleRelated.. 属性类型和时间戳也为空..
问题是 flink 内部 POJO 序列化程序无法正确解析多态性。
因此我将 Kyro 序列化程序设置为默认值:
val config = env.getConfig
config.enableForceKryo()