flink-使用匕首注射-不可序列化?

yyyllmsg  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(381)

我使用flink(最新通过git)从Kafka流到Cassandra。为了简化单元测试,我通过dagger添加了依赖注入。
objectgraph似乎正在正确设置自身,但flink将“内部对象”标记为“不可序列化”。如果我直接包含这些对象,它们会工作-那么有什么区别?
类实现了mapfunction和@inject,分别为cassandra和读取配置文件注入了一个模块。
有没有一种方法来建立这个,所以我可以使用后期绑定或Flink使这不可能?

编辑:

fwiw依赖注入(通过dagger)和richmapfunction不能共存。匕首不允许你在定义中包含任何有扩展的对象。

进一步:

通过dagger lazy示例化的对象也不会序列化。
线程“main”org.apache.flink.api.common.invalidprogramexception中出现异常:object com.someapp。savemap@2e029d61 不可序列化
...
原因:java.io.notserializableexception:dagger.internal.lazybinding$1

sbdsn5lh

sbdsn5lh1#

在深入讨论问题的细节之前,先了解一下apache flink中函数的可序列化性的一些背景知识:

可串行化

ApacheFlink使用java序列化(java.io.serializable)来发布函数对象(这里是 MapFunction )给那些并行执行它们的工人。因此,函数需要是可序列化的:函数不能包含任何不可序列化的字段,即不是基元(int,long,double,…)且不实现的类型 java.io.Serializable .
处理不可序列化结构的典型方法是延迟初始化它们。

延迟初始化

在flink函数中使用不可序列化类型的一种方法是延迟初始化它们。保存这些类型的字段仍然是 null 当函数被序列化以发送时,并且仅在worker对函数进行反序列化之后设置。
例如,在scala中,可以简单地使用惰性字段 lazy val x = new NonSerializableType() . 这个 NonSerializableType 类型实际上只在第一次访问变量时创建 x ,通常在工人身上。因此,类型可以是不可序列化的,因为 x 当函数被序列化为传送到worker时为null。
在java中,可以初始化 open() 函数的方法,如果你使它成为一个丰富的函数。丰富的功能(如 RichMapFunction )是基本函数的扩展版本(这里是 MapFunction )给你生命周期的方法,比如 open() 以及 close() .

惰性依赖注入

我不太熟悉依赖注入,但dagger似乎也提供了类似于惰性依赖的东西,这可能有助于解决类似scala中惰性变量的问题:

new MapFunction<Long, Long>() {

  @Inject Lazy<MyDependency> dep;

  public Long map(Long value) {
    return dep.get().doSomething(value);
  }
}
50pmv0ei

50pmv0ei2#

我也面临类似的问题。有两种方法可以不反序列化依赖项。
使依赖关系保持静态,但这并不总是可能的。它也会扰乱你的代码设计。
使用transient:通过将依赖项声明为transient,您的意思是它们不是对象持久状态的一部分,也不应该是序列化的一部分。

public ClassA implements Serializable{
  //class A code here
}

public ClassB{
  //class B code here
}

public class MySinkFunction implements SinkFunction<MyData> {
  private ClassA mySerializableDependency;
  private transient ClassB nonSerializableDependency;
}

这在使用外部库时尤其有用,因为您无法更改外部库的实现以使其可序列化。

相关问题