flink序列化

flvlnr44  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(449)

我必须处理时区和纳秒时间分辨率。因此我使用ZoneDateTime。显然,ApacheFlink没有正确序列化ZoneDateTime。它确实按预期序列化了localdatetime部分,但是它忘记了处理时区。
例如,当我在flink stream map函数中记录一个分区日期时,我总是得到如下结果

2018-03-01T04:10:30.773471918null

而在数据开始时,我得到了合适的区域

2018-03-01T04:10:30.773471918-05:00

空值表示区域。当然,稍后我会得到一个空指针异常,因为我必须使用适当的时间比较,这需要区域。
我怎样才能最容易地解决这个问题?谢谢你的回复。

fcipmucu

fcipmucu1#

我不完全理解为什么它不接收序列化程序。这个解决方案至少有效:我为zoneDateTime实现了一个kryo序列化程序

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime

class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
  setImmutable(true)

  override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
    ZonedDateTimeSerializer.write(out, obj)
  }

  override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
    ZonedDateTimeSerializer.read(in)
  }
}

object ZonedDateTimeSerializer {
  def write(out: Output, obj: ZonedDateTime): Unit = {
    LocalDateSerializer.write(out, obj.toLocalDate)
    LocalTimeSerializer.write(out, obj.toLocalTime)
    ZoneIdSerializer.write(out, obj.getZone)
  }

  def  read(in: Input): ZonedDateTime = {
    val date = LocalDateSerializer.read(in)
    val time = LocalTimeSerializer.read(in)
    val zone = ZoneIdSerializer.read(in)
    ZonedDateTime(date, time, zone)
  }
}

我从最新的实现kyro获取了实现。然后我登记如下:

env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])

这似乎解决了问题。不确定它是否来自于我使用timesforscala这个事实,但是我想使用这个库,因为它添加了我依赖的重要的附加内容。欢迎评论。

相关问题