我正在尝试修改Akka distrubuted workers sample项目。基本上,我想创建更多的Work
项,这些项将从FrontEnd
发送,然后由WorkExecutor
以分布式的方式进行计算,然后将结果返回到FrontEnd
。
我的问题是,我希望扩展Work
的方式能够以一种相当通用的方式(例如,WorkManager
应该只处理Work
)通过其他类型的工作(扩展Work
)。
sealed trait Work extends CborSerializable {
def workId: String
def job: Int
}
case class MyWorkItem(workId: String, job: Int) extends Work
不幸的是,jackson-cbor无法序列化Work
。
4163 [ClusterSystem-akka.actor.default-dispatcher-5] ERROR org.test.worker.WorkManager$ - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Cannot construct instance of `org.test.worker.Work` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])"�dwork�fworkIdx$f031c8c6-6b7e-4e63-9beb-e97330eea35ecjob��"; line: -1, column: 6] (through reference chain: org.test.worker.WorkState$WorkAccepted["work"])
akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Exception during recovery. Last known sequence number [2]. PersistenceId [master], due to: Cannot construct instance of `org.test.worker.Work` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (byte[])"�dwork�fworkIdx$f031c8c6-6b7e-4e63-9beb-e97330eea35ecjob��"; line: -1, column: 6] (through reference chain: org.test.worker.WorkState$WorkAccepted["work"])
at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:220)
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:91)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:66)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:171)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:124)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:87)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57)
at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:124)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:129)
at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
2条答案
按热度按时间pxyaymoc1#
错误为
无法构造
org.test.worker.Work
的示例这是因为
Work
是抽象类,因此无法构造。答案也在错误消息中:
抽象类型需要Map到具体类型,具有自定义反序列化程序,或者包含其他类型信息
请注意,它是
sealed
的事实并不相关,问题是它是trait
,而不是具体的class
。6yt4nkrj2#
TL/DR
以上是此链接的复制粘贴https://doc.akka.io/docs/akka/current/serialization-jackson.html#polymorphic-types
说明
假设您有一个接受
Work
示例的方法,但问题是您可能有许多Work
特征的实现,苏 akka (或者更确切地说Jackson)不知道需要将传入的JSONMap到Work
的哪个实现。通过使用JsonTypeInfo
和JsonSubTypes
,您可以指示Jackson将type
属性添加到序列化的JSON中,然后使用该属性将JSON反序列化回您的具体类。您可能会说您的特征是密封的,Jackson/Akka可能会发现您只有一个实现,并使用它,但我猜这没有实现,可能不太容易实现。
这是一个棘手的问题,因为仅当消息从集群中的一个节点发送到另一个节点时才会出现此错误,而当消息在同一节点上的参与者之间发送时不会出现此错误。