确保flink中protobuf状态对象的状态模式演化

ekqde3dh  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(320)

我想弄清楚如何为protobuf状态启用模式演化。我们的flink程序是用scala2.11编写的,我们使用flink1.10.1和scalapb生成protobuf代码。rocksdb用作状态后端。
我在写一封信 TypeSerializer 对于由4个双字段组成的protobuf消息。这是一个poc,所以我选择的消息格式非常简单。
Flink的这方面似乎没有充分的记载。我尽可能地实现了所有必需的方法 TypeSerializer 除外 snapshotConfiguration . 中途我发现 ProtobufSerializer , ProtobufTypeSerializer 以及 ProtobufTypeSerializerSnapshot JavaAPI文档中的类,这些类在文章中根本没有提到。
现在,具体问题:
我该怎么写 snapshotConfiguration 方法?我在这里画了一个空白,特别是在序列化程序快照中包含什么,以便能够确定序列化程序之间的兼容性( TypeSerializerSnapshot#resolveSchemaCompatibility ).
是什么 ProtobufSerializer 以及 ProtobufTypeSerializer 为了什么?我应该实现它们而不是 TypeSerializer 为了我的需要?
我不确定我写了什么 getLength 正确地。我假设4个double需要4 x 24=96字节,但是protobuf消息的长度不是恒定的,即使是4个double。
你的孩子怎么了 TypeSerializer 接受另一个t并在t不可变时尝试重用它的方法?在我的情况下,他们抛出了一个例外。他们是否被称为 isImmutableType = true ?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题