flink cep pojoserializer错误多态性解析

eqqqjvef  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(260)

目前,我有一个非常奇怪的行为,当我打印的cep模式的结果。
数据模型如下:
事件:(type:string, timestamp:long)
车辆相关事件:(vehicleid:integer)
与车辆相关的位置:(pos:integer, direction:integer)
车辆相关:(pos:integer,id:整数,direction:integer)
cep部分如下所示:

val pattern = Pattern
  .begin[VehicleRelated]("before")
  .subtype(classOf[Position])
  .next("recognize")
  .subtype(classOf[Recognize])
  .next("after")
  .subtype(classOf[Position])
  .within(Time.seconds(5))

val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
  .select(pattern => {
    val s = pattern("recognize").head.asInstanceOf[Recognize]
    LOG.debug(s.toString)
    s
  })

recognitions.print("RECO")

日志的输出如下:

14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())

现在最大的问题是,为什么在返回casted对象后vehicleid属性为空?有什么建议吗?
更新我做了一些调查,发现pojoserializer是问题所在。copy函数在第151行被调用 this.numFields 是错的。。计数仅包括识别类本身的属性计数,但不包括继承类,在本例中为事件和车辆相关。。属性类型和时间戳也为空。。

ijxebb2r

ijxebb2r1#

问题是flink内部pojo序列化程序无法正确解析多态性。
因此,我将kyro序列化程序设置为默认值:

val config = env.getConfig
config.enableForceKryo()

相关问题