Flink CEP PojoSerializer 错误的多态解析

Flink CEP PojoSerializer wrong polymorphism resolution

目前,当我打印 CEP 模式的结果时,我有一个非常奇怪的行为。

数据模型如下:

CEP 部分如下所示:

val pattern = Pattern
  .begin[VehicleRelated]("before")
  .subtype(classOf[Position])
  .next("recognize")
  .subtype(classOf[Recognize])
  .next("after")
  .subtype(classOf[Position])
  .within(Time.seconds(5))

val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
  .select(pattern => {
    val s = pattern("recognize").head.asInstanceOf[Recognize]
    LOG.debug(s.toString)
    s
  })

recognitions.print("RECO")

日志输出如下:

14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())

现在最大的问题是,为什么在 return 转换对象后 vehicleId 属性为空?有什么建议吗?

Update 我做了一些调查,发现问题出在 PojoSerializer 上。复制函数被调用,第 151 行 this.numFields 是错误的。计数仅包括 Recognize class 本身的属性计数,但不包括继承的 classes,在此case Event 和 VehicleRelated.. 属性类型和时间戳也为空..

问题是 flink 内部 POJO 序列化程序无法正确解析多态性。

因此我将 Kyro 序列化程序设置为默认值:

val config = env.getConfig
config.enableForceKryo()