akka 问花样:如何使请求和回复具有上下文感知能力?

bgibtngc  于 2022-11-23  发布在  其他
关注(0)|答案(1)|浏览(162)

对于一个大学作业,我必须用Akka实现一个Raft协议的模拟(我使用Akka类型,使用Behaviors)。因此,使用ask模式是有意义的,正如在两个参与者之间带有询问的请求-响应示例中的文档所演示的那样。
在我的实现中,请求和响应必须是上下文感知的:这意味着,当执行查询的执行元收到响应时,它必须知道该响应是针对哪个查询的。文档中的示例建议在消息中包含查询ID。
我需要解决的问题可以用下面的例子来描述:

  • 执行元A向执行元B发送ID=1的查询(它在消息中包括查询ID)。
  • B没有及时回复(网络或B本身可能较慢),因此A向B重新发出ID=2的查询。
  • 参与者B接收到ID=1的查询,并回复参与者A(在消息中包括查询ID)。
  • 参与者A收到B的ID=1的回复。A知道它发送的最后一个查询的ID=2,因此不能处理该回复,而是等待ID=2的回复。

我认为,为了“过滤”没有正确查询ID的回复,我可以在参与者A中放置一个BehaviorInterceptor,它检查回复中的ID是否与预期的查询ID匹配。
总结一下:

  • 参与者A在散列表中写入参与者B的下一个回复所期望的查询ID,
  • 拦截器使用这个散列表来检查回复中的ID,这是一个好的设计吗?

而且,我不知道ask是否阻塞,理想情况下,我希望以非阻塞的方式使用ask:参与者A ask s参与者B,并且在等待B的回复时,A可以执行其他操作。在等待B的回复时,参与者A还可以根据需要更改其行为(也是不处理B的回复的行为)。
谢谢你的任何见解!

rseugnpd

rseugnpd1#

两个参与者之间的请求(使用ActorContext)是非阻塞的。
由于对给定目标的请求的高水位线是参与者协议状态的重要组成部分,因此我只将其存储在发出请求的参与者的状态中(例如,在Scala中为Map[ActorRef[Request], Int])。当接收到适应的响应时,第一件事是将响应中的ID与目标的高水印进行比较。
例如,在Scala中:

sealed trait RequestA
case class QueryB(target: ActorRef[RequestB]) extends RequestA
case class ResponseFromB(target: ActorRef[RequestB], id: Int, resp: ResponseB) extends RequestA
case class BTimedOut(target: ActorRef[RequestB], id: Int) extends RequestA

sealed trait RequestB

def buildRequestB(id: Int)(replyTo: ResponseB): RequestB = ???

sealed trait ResponseB

def aBehavior(highWater: Map[ActorRef[RequestB], Int]): Behavior[RequestA] =
  Behaviors.receive { (context, msg) =>
    case QueryB(target) =>
      implicit val timeout: Timeout = 10.seconds

      val nextHighwater = highWater.get(target).map(_ + 1).getOrElse(0)

      // request is sent and received "in the background"
      context.ask(target, buildRequestB(nextHighwater)) {
        case Success(resp) => ResponseFromB(target, nextHighwater, resp)
        case Failure(_) => BTimedOut(target, nextHighwater)
      }

      aBehavior(highWater + (target -> nextHighwater))

    case ResponseFromB(target, id, resp) =>
      if (highWater.get(target).contains(id)) {
        context.log.info("Accepting response: {}", resp)
        Behaviors.same
      } else {
        context.log.info("Ignoring response: {}", resp)
        Behaviors.same
      }

    case BTimedOut(target, id) =>
      context.log.warning("Ask of {} (sequence ID {}) timed out", target, id)
      Behaviors.same
  }

相关问题