flink with guava cache-processfunction的实现不可序列化

vptzau2j  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(509)

我实现了一个processfunction,它使用guava缓存过滤传入事件流。代码如下所示:

object myJob {
 private def updateCache(cacheObject, someValue) = {}
 private def getCacheValue(cacheObject, someKey) = {}

 override def run(params, executionEnv) = {
  val inputStream = executionEnv.stream

  val c = CacheBuilder.newBuilder()

  val outStream = inputStream.process(new ProcessFunction() { 
    updateCache()
    getCacheValue} 
    )
 }
}

向flink提交作业时,出现以下错误:

Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the ProcessFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:666)
at org.apache.flink.streaming.api.scala.DataStream.process(DataStream.scala:686)

知道我做错了什么吗?如何解决此序列化错误?

zkure5ic

zkure5ic1#

这个错误基本上是说您依赖于一个不能为flink序列化的对象。在您显示的情况下,将带有加载程序的字段标记为lazy应该可以解决这个问题:

lazy val c = CacheBuilder.newBuilder()

一般来说,在这种情况下,您应该参考flink的文档,该文档解释了这个问题

相关问题