Apache Spark Kryo:反序列化旧版本的类

7jmck4yq  于 2023-04-07  发布在  Apache
关注(0)|答案(2)|浏览(205)

我需要通过添加两个新参数来修改一个类。这个类是用Kryo序列化的。我目前正在持久化与这个类相关的信息,除其他外,作为RDD,每次我停止我的流。当我重新启动流时,我加载我以前持久化的信息,并使用它们在我停止和重新启动时保持一致性。
由于我持久化的类需要这些新参数,所以我更改了类和序列化器,为新参数添加了新的kryo.writeObject(output, object, ObjectSerializer)kryo.readObject(input, classOf[Object], ObjectSerializer)
现在,每当我重新启动我的流时,我都会获得一个异常:“遇到未注册的类..."。
这看起来很明显,因为我试图反序列化一个对象,而这个对象不包含在我停止流时所保存的信息中。如果我删除这些数据并启动流,就好像它以前没有运行过一样,那么异常就不会发生。
有没有办法避免这种异常?也许可以通过指定一些默认值来防止这些参数丢失?
谢谢你

编辑:

我发现了一些以前没见过的有用的东西:Kryo issue 194
这个家伙通过简单地插入一个长的定义他应该使用哪个版本的反序列化器来实现版本控制。这是一个简单的解决方案,但是,由于编写我正在工作的代码的公司没有考虑向前兼容性,我想我必须扔掉所有在新序列化器之前持久化的数据。
请让我知道,如果你任何人都可以提出一个更好的解决方案。

编辑2:

仍然有这种情况的问题。我尝试使用CompatibleFieldSerializer,如下所述:CompatibleFieldSerializer Example所以通过注册这个序列化器,而不是以前使用的自定义序列化器。结果是,现在,当重新加载持久化数据时,它会给出java.lang.NullPointerException。如果没有以前的数据持久化,仍然没有问题。我可以启动我的流,序列化新数据,停止流,反序列化并重新启动我的流。仍然没有解决方案的线索。

vtwuwzda

vtwuwzda1#

这个问题的解决方案是在几个月前找到的。所以我想尽快发布这个问题的答案。问题在于,由于代码中的错误,类被序列化为标准Kryo FieldSerializer,这是不向前兼容的。我们必须执行以下操作来反序列化旧类并将其转换为新的序列化类。
情况是:

case class ClassA(field1 : Long, field2 : String)

是这样连载的:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       classA( 
           field1 = input.readLong(),
           field2 = input.readLong()
       )

并且循环了一个Seq,其中包含要用序列化程序序列化的类,以便为所有类注册所有序列化程序。

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    final def register(kryo: Kryo) = {
         registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

需要通过添加一个新字段来修改这个类,该字段是另一个case类的示例。
为了执行这样的更改,我们必须使用与Kryo库“可选”有关的注解,

...
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional
import scala.annotation.meta.field
...

case class ClassA(field1 : Long, field2 : String,  @(Optional @field)("field3") field3 : ClassB)

序列化器被修改,例如当阅读旧的序列化类时,它可以用默认值示例化field3,并且当写入时,写入这样的默认值:

object ClassASerializer extends Serializer[ClassA] with Serializable{
  override def write(kryo: Kryo, output: Output, t: ClassA) = {
      output.writeLong    { t.field1 }
      output.writeString  { t.field2 }
      kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer)

 }
  override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
       ClassA( 
           field1 = input.readLong(),
           field2 = input.readLong(),
           field3 = ClassB.default
       )

kryo序列化器注册也被修改为注册可选字段:

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ...
    def optionals = Seq("field3")

    final def register(kryo: Kryo) = {
        optionals.foreach { optional =>
        kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) }
        registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) }
    }

因此,我们可以编写新版本的序列化类。之后,我们必须删除可选的注解,修改序列化器以便从新序列化类中读取真实的字段,并删除可选的序列化器注册并将其添加到注册表Seq。
同时,我们纠正了代码中通过FieldSerializer强制序列化的错误,但这不在问题的范围内。

h7appiyu

h7appiyu2#

我假设您遇到所有这些麻烦是因为现有数据是用FieldSerializer序列化的,这会阻止您在保持向后兼容性的同时添加新字段。
如果是这样的话,那么我发现一件事很有用
1.将所有当前数据从使用FieldSerializer序列化的数据迁移到使用VersionFieldSerializer序列化的数据,使用如下方法:在这里,您需要使用两个序列化器注册子序列化器。

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.VersionFieldSerializer;

public class FieldToVersionFieldMigrationSerializer<T> extends VersionFieldSerializer<T> {

  private final FieldSerializer<T> fieldSerializer;

  public FieldToVersionFieldMigrationSerializer(Kryo kryo, Class<T> type, FieldSerializer<T> fieldSerializer) {
    super(kryo, type);
    this.fieldSerializer = fieldSerializer;
  }

  @Override
  public T read(Kryo kryo, Input input, Class<T> type) {
    try {
      return super.read(kryo, input, type);
    } catch (Exception e) {
      return fieldSerializer.read(kryo, input, type);
    }
  }
}

1.迁移所有数据以支持VersionFieldSerializer后,可以引入新字段,并使用@Since对其进行注解以支持向后兼容性
此外,如果您已经解决了这个问题,请让我们知道您是如何做到的。

相关问题