无法使用jackson-cbor序列化Akka中的密封特征

bksxznpy  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(155)

我正在尝试修改Akka distrubuted workers sample项目。基本上,我想创建更多的Work项,这些项将从FrontEnd发送,然后由WorkExecutor以分布式的方式进行计算,然后将结果返回到FrontEnd
我的问题是,我希望扩展Work的方式能够以一种相当通用的方式(例如,WorkManager应该只处理Work)通过其他类型的工作(扩展Work)。

sealed trait Work extends CborSerializable {
  def workId: String

  def job: Int
}

case class MyWorkItem(workId: String, job: Int) extends Work

不幸的是,jackson-cbor无法序列化Work

4163 [ClusterSystem-akka.actor.default-dispatcher-5] ERROR org.test.worker.WorkManager$ - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Cannot construct instance of `org.test.worker.Work` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (byte[])"�dwork�fworkIdx$f031c8c6-6b7e-4e63-9beb-e97330eea35ecjob��"; line: -1, column: 6] (through reference chain: org.test.worker.WorkState$WorkAccepted["work"])
akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Cannot construct instance of `org.test.worker.Work` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (byte[])"�dwork�fworkIdx$f031c8c6-6b7e-4e63-9beb-e97330eea35ecjob��"; line: -1, column: 6] (through reference chain: org.test.worker.WorkState$WorkAccepted["work"])
    at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:220)
    at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)
    at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:91)
    at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:66)
    at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
    at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
    at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:171)
    at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
    at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
    at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:124)
    at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
    at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
    at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:87)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
    at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
    at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:124)
    at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
    at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
    at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
    at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:129)
    at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
pxyaymoc

pxyaymoc1#

错误为
无法构造org.test.worker.Work的示例
这是因为Work是抽象类,因此无法构造。
答案也在错误消息中:
抽象类型需要Map到具体类型,具有自定义反序列化程序,或者包含其他类型信息
请注意,它是sealed的事实并不相关,问题是它是trait,而不是具体的class

6yt4nkrj

6yt4nkrj2#

TL/DR

final case class Zoo(primaryAttraction: Animal) extends MySerializable

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(
  Array(
    new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
    new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
sealed trait Animal

final case class Lion(name: String) extends Animal

final case class Elephant(name: String, age: Int) extends Animal

以上是此链接的复制粘贴https://doc.akka.io/docs/akka/current/serialization-jackson.html#polymorphic-types

说明

假设您有一个接受Work示例的方法,但问题是您可能有许多Work特征的实现,苏 akka (或者更确切地说Jackson)不知道需要将传入的JSONMap到Work的哪个实现。通过使用JsonTypeInfoJsonSubTypes,您可以指示Jackson将type属性添加到序列化的JSON中,然后使用该属性将JSON反序列化回您的具体类。
您可能会说您的特征是密封的,Jackson/Akka可能会发现您只有一个实现,并使用它,但我猜这没有实现,可能不太容易实现。
这是一个棘手的问题,因为仅当消息从集群中的一个节点发送到另一个节点时才会出现此错误,而当消息在同一节点上的参与者之间发送时不会出现此错误。

相关问题