如何在akka流中使用类型化akka来使用ActorSink.actorRefWithBackpressure

sxpgvts3  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(138)

我试图学习akka流使用akka类型的文档是有点抽象,当谈到akka类型
Sink.actorRefWithBackpressure示例非常简单易懂,因为ActorSink.actorRefWithBackpressure示例是抽象,
在第一个示例中,我们有AckingReceiver actor,它执行所需的工作,但在第二个示例中
没有case类的实现,因为它在AckingReceiver

val actor: ActorRef[Protocol] = targetActor()

我已经看到这个代码的一些地方,但我无法理解它以及

def targetActor(): ActorRef[Protocol] = ???

我们如何提供目标角色的实现来处理case类,如有任何帮助将不胜感激

agxfikkp

agxfikkp1#

ActorRef[Protocol]是一个类型化的参与者,在类型化的ActorSystem之外获得一个ActorRef比在经典的ActorSystem中要复杂得多,这可能就是为什么文档中省略了它(因为它对解释如何使用ActorSink.actorRefWithBackpressure并不重要)。
通常,您将设置一个键入的ActorSystem,并向ActorSystem请求一个ActorRef

import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl._

object MainSystem {
  sealed trait Command
  case class ObtainProtocolActor(replyTo: ActorRef[ProtocolActorIs])

  sealed trait Reply
  case class ProtocolActorIs(actor: ActorRef[Protocol])

  def apply(): Behavior[Command] =
    Behaviors.receive { (context, msg) =>
      case ObtainProtocolActor(replyTo) =>
        val protocolActor: ActorRef[Protocol] = context.spawnAnonymous(
          // Define the protocol actor
          Behaviors.receive[Protocol] { (context, msg) =>
            case Init(ackTo) =>
              println(s"Actor ${context.self.path} initializing")
              ackTo ! Ack
              Behaviors.same
            case Message(ackTo, msg) =>
              println(s"Actor ${context.self.path} received $msg")
              ackTo ! Ack
              Behaviors.same
            case Complete =>
              context.stop()  // Delayed until the message is processed
              ackTo ! Ack
              Behaviors.same
            case Fail(ex) =>
              println(s"Actor ${context.self.path} got failure from stream: ${ex.getMessage}")
              Behaviors.same
          })
        context.watch(protocolActor)
        replyTo ! ProtocolActorIs(protocolActor)
    }.receiveSignal {
      case (context, Terminated(ref)) =>
        println(s"Actor ${ref.path} terminated")
    }
}

val actorSystem = ActorSystem(MainSystem(), "main")

def targetActor(): ActorRef[Protocol] = Await.result(
  actorSystem.ask(MainSystem.ObtainProtocolActor(_)).map(_.replyTo),
  15.second
)

这可能显示了经典和类型化之间两个最大的实际差异,但可能并不明显:

  • typed中的ActorSystem是一个参与者(在本例中,实际上有可能将ActorRef[Protocol]作为ActorSystem,尽管您实际上不太可能希望这样做)
  • 询问模式以相当戏剧性方式改变

相关问题