Scala akka事件源如何得到一个消息回根?

fae0ux8s  于 2022-11-05  发布在  Scala
关注(0)|答案(2)|浏览(222)

我目前正在努力阅读我的actor的状态,所以在本例中,我只想从我的State类中获取history参数--例如,在调用端点时打印它。
我以前曾成功地使用?operator实现了这一点,但我从未尝试过使用事件源。
到目前为止,我拥有的代码是:

object MyPersistentBehavior {
  sealed trait Command
  final case class Add(data: String) extends Command
  case object Clear extends Command

  sealed trait Event
  final case class Added(data: String) extends Event
  case object Cleared extends Event

  final case class State(history: List[String] = Nil)

  val commandHandler: (State, Command) => Effect[Event, State] = { (state, command) =>
    command match {
      case Add(data) => Effect.persist(Added(data))
      case Clear     => Effect.persist(Cleared)
    }
  }

  val eventHandler: (State, Event) => State = { (state, event) =>
    event match {
      case Added(data) => state.copy((data :: state.history).take(5))
      case Cleared     => State(Nil)
    }
  }

  def apply(id: String): Behavior[Command] =
    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId.ofUniqueId(id),
      emptyState = State(Nil),
      commandHandler = commandHandler,
      eventHandler = eventHandler)
}

在我的main方法中,我想打印状态:

val personActor: ActorSystem[MyPersistentBehavior.Command] = ActorSystem(MyPersistentBehavior("IDDD"), "AHA")
//personActor ? GetState <- something like this

谢谢你!

xyhw6mcr

xyhw6mcr1#

我没有在Akka中使用过事件采购,但快速浏览了一下文档,我认为这可能会有所帮助:

case class GetState(replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command

// and in the match for commands:
...
    case GetState(replyTo) =>
      replyTo ! StatusReply.Success(state)

      // or if replyTo was of type ActorRef[State] => 
      replyTo ! state

Akka的事件采购文档中还有一个Effect,看起来很有趣:

def onCommand(subscriber: ActorRef[State], state: State, command: Command): Effect[Event, State] = {
  command match {
    case Add(data) =>
      Effect.persist(Added(data)).thenRun(newState => subscriber ! newState)
    case Clear =>
      Effect.persist(Cleared).thenRun((newState: State) => subscriber ! newState).thenStop()
  }
}

你也可以在每个效果后回复最新状态:

case class AddCommand(number: Int, replyTo: ActorRef[State]) extends Command

// in the command handler
    case add: AddCommand(num, replyTo) => 
      // your logic here
      Event.persist(Added(num)) thenRun { newState => 
        replyTo ! newState
      }

文档中还有很多其他选项,因此我强烈建议您查看一下:https://doc.akka.io/docs/akka/current/typed/persistence.html

yzuktlbb

yzuktlbb2#

好的,如果你需要一个完整的例子,请看我的blog
你会在博客和GitHub仓库中看到,在那里你会看到一个通用的'onReport'命令,用于所有参与者向调用者报告他们的状态。

def commandHandler(context: ActorContext[CreditSMEvent], cmd: CreditSMEvent, state: State): ReplyEffect[PersistEvent, State] =
      cmd match {
        case onReport(useCaseKey, replyTo) =>
            Effect.reply(replyTo)( ReportResponse(state, java.util.Collections.unmodifiableMap(state.controlObject)))

        case _ =>
            commandHandlerInternal(context, cmd, state)
}

你可以在博客中找到它。
我希望这能帮到你。

相关问题