目前,我有一个非常奇怪的行为,当我打印的cep模式的结果。
数据模型如下:
事件:(type:string, timestamp:long)
车辆相关事件:(vehicleid:integer)
与车辆相关的位置:(pos:integer, direction:integer)
车辆相关:(pos:integer,id:整数,direction:integer)
cep部分如下所示:
val pattern = Pattern
.begin[VehicleRelated]("before")
.subtype(classOf[Position])
.next("recognize")
.subtype(classOf[Recognize])
.next("after")
.subtype(classOf[Position])
.within(Time.seconds(5))
val patternStream = CEP.pattern(actionEvents, pattern)
val recognitions = patternStream
.select(pattern => {
val s = pattern("recognize").head.asInstanceOf[Recognize]
LOG.debug(s.toString)
s
})
recognitions.print("RECO")
日志的输出如下:
14:45:27,286 DEBUG stoff.schnaps.RecognizingJob$ - Recognize(VehicleId: 2, Id: 601, Pos: 1601, Direction: 35, Add: Map())
RECO:8> Recognize(VehicleId: null, Id: 601, Pos: 1601, Direction: 35, Add: Map())
现在最大的问题是,为什么在返回casted对象后vehicleid属性为空?有什么建议吗?
更新我做了一些调查,发现pojoserializer是问题所在。copy函数在第151行被调用 this.numFields
是错的。。计数仅包括识别类本身的属性计数,但不包括继承类,在本例中为事件和车辆相关。。属性类型和时间戳也为空。。
1条答案
按热度按时间ijxebb2r1#
问题是flink内部pojo序列化程序无法正确解析多态性。
因此,我将kyro序列化程序设置为默认值: