flink状态模式迁移

egdjgwm8  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(495)

我有一个独立集群上的flink流应用程序,它使用memorystatebackend。kryo的taggedfieldserializer被用作默认序列化程序。
当我更改状态的模式并重新部署应用程序时,出现了以下异常

Caused by: org.apache.flink.util.StateMigrationException: State migration isn't supported, yet.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:209)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:142)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createValueState(HeapKeyedStateBackend.java:234)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend$1.createValueState(AbstractKeyedStateBackend.java:315)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:312)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:392)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)

如果有人建议我解决这个问题,或者我应该使用fsstatebackend来解决这个问题,这将非常有帮助。
p、 如果我想使用s3上的fsstatebackend来运行在独立集群上的flink应用程序,那么必须做哪些配置更改。

t5zmwmid

t5zmwmid1#

使用 FsStateBackend 不会解决这个问题,因为它还使用 HeapKeyedStateBackend 在引擎盖下,这就是引发这个异常的原因。
flip-22有助于解决这种状态迁移问题,但它还没有实现。
目前我听说的最好的选择是使用基于avro的序列化程序,因为它可以实现为无缝地处理新旧模式。但这不适合胆小的人。
关于fsstatebackend配置,请参阅这里的(编写良好的)文档。

相关问题