在flink项目中,我使用case类click。
case class click( date: LocalDateTime, stbId:String, channelId :Int)
这个类填充了数据集,并且在日期为Java8的情况下运行良好 java.time.LocalDateTime
. 在Java7环境中切换到org.joda(version2.9)后,对数据集中click对象的调用没有像以前那样执行。对单击对象的日期字段的某些函数的访问 NullPointerExceptions
. 这些函数的示例如下
getHourOfDay toString
我可以确保click类的date字段不为空。我怀疑joda时间库与kryo序列化的交互不好。请参阅joda datetime format cause null pointer error in spark rdd functions或npe in spark with joda datetime in the flink api there is org.apache.flink.api.java.typeutils.runtime.kryo.serializers with the static method registerJodaTime
. 这似乎是相关的。我只是简单地试了一下
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
这还不够。我说的对吗?如何使用java.typeutils.runtime.kryo?
flink使用的版本:0.9.1。斯卡拉:2.10焦达时间2.9
跟进:以下是建议添加的代码(感谢fabian和robert)
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
在嵌入式执行的日志文件中,我可以找到以下相关部分:
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
尽管如此,我还是目睹了以下情况
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
2条答案
按热度按时间btxsgosb1#
flink正在使用kryo来处理它无法序列化的类型。
LocalDateTime
就是这样一个班。遗憾的是,kryo也不能正确地序列化它,所以我们必须告诉kryo如何通过为这个类提供一个专门的序列化程序来实现它。
添加
de.javakaffee:kryo-serializers
作为依赖项:(请注意,添加此依赖项可能会导致在集群上使用flink时出现问题。请让我知道)
将新序列化程序注册到
ExecutionEnvironment
:我希望这有帮助(我保留旧的答案作为参考)
在flink中调试kryo/序列化程序问题的一些一般性说明:
在本地执行作业时(也应在
./bin/flink
但是输出可能在log/目录中),您应该看到如下内容:注册类型和kryo序列化程序的数量大于0。
调试日志级别(替换
INFO
与DEBUG
在log4j.properties
)实际上,您可以获得有关已注册序列化程序的更详细信息:kg7wmglp2#
您应该在中注册joda序列化程序
ExecutionConfig
的ExecutionEnvironment
:希望这有帮助。