在flink中序列化复杂模型的最佳实践

o2g1uqev  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(695)

我正在使用udf将一些ml模型应用于数据流。因为 Model 类(来自第三方库)不能由flink自动序列化,我使用两个变量,如下所示:

class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _

  ...
}

哪里: models 保存加载的(正在运行的)模型(从 ModelDef ,基本上是字符串) modelsBytes 是真正的(键控)状态,它持有相同的模型,但作为一个字节块,以便检查点工作良好。
整个解决方案很简单(只需要调用 fromBytes / toBytes 但我不知道这是否是一种常见/最佳实践。对于本质上相同的事物有两个变量看起来很奇怪。例如,在这里您可以找到一个使用 TypeSerializer[Option[Model]] 相反,这看起来更干净,但实施起来也更复杂。
所以,基本上:
我应该用这个吗 TypeSerializer 运行/序列化模型的方法或以某种方式复制的状态可以吗?
另外,如果你能给我指出一些关于flink中自定义类型序列化的文档/示例,那就太好了,通常我发现官方文档在这方面有点欠缺。

j2cgzkjk

j2cgzkjk1#

为了性能而明智地对数据进行非规范化是一种非常常见的模式。如果你没有使用太多的内存,那么坚持这种方法。

j9per5c4

j9per5c42#

我可能会使用堆状态后端和自定义 TypeSerializer .
堆状态后端将只序列化检查点上的数据,否则将保持数据原样。因此,在使用后端而不是管理Map本身时,几乎没有性能损失。但是,它将消除手动执行序列化和同步的需要。

相关问题