我正在使用udf将一些ml模型应用于数据流。因为 Model
类(来自第三方库)不能由flink自动序列化,我使用两个变量,如下所示:
class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _
...
}
哪里: models
保存加载的(正在运行的)模型(从 ModelDef
,基本上是字符串) modelsBytes
是真正的(键控)状态,它持有相同的模型,但作为一个字节块,以便检查点工作良好。
整个解决方案很简单(只需要调用 fromBytes
/ toBytes
但我不知道这是否是一种常见/最佳实践。对于本质上相同的事物有两个变量看起来很奇怪。例如,在这里您可以找到一个使用 TypeSerializer[Option[Model]]
相反,这看起来更干净,但实施起来也更复杂。
所以,基本上:
我应该用这个吗 TypeSerializer
运行/序列化模型的方法或以某种方式复制的状态可以吗?
另外,如果你能给我指出一些关于flink中自定义类型序列化的文档/示例,那就太好了,通常我发现官方文档在这方面有点欠缺。
2条答案
按热度按时间j2cgzkjk1#
为了性能而明智地对数据进行非规范化是一种非常常见的模式。如果你没有使用太多的内存,那么坚持这种方法。
j9per5c42#
我可能会使用堆状态后端和自定义
TypeSerializer
.堆状态后端将只序列化检查点上的数据,否则将保持数据原样。因此,在使用后端而不是管理Map本身时,几乎没有性能损失。但是,它将消除手动执行序列化和同步的需要。