akka http路由中如何处理长运行请求

xlpyo6sf  于 2022-11-06  发布在  其他
关注(0)|答案(1)|浏览(143)

我正在使用akka http,我的一个路由通过akka http客户端API与外部服务交互,并且httpRequest持续运行,我无法使其工作,这是我的用例-〉我正在与Janus服务器交互,并且在服务器使用“keepAlive”或“event”进行响应时,立即执行长轮询get请求,我再次请求,以此类推,服务器继续响应
所有这一切都发生在一个actor内部,我有一个akka http路由,它正在初始化第一个请求
这是我代码

final case class CreateLongPollRequest(sessionId:BigInt)
class LongPollRequestActor (config: Config) extends Actor {
  def receive = {
    case CreateLongPollRequest(sessionId) =>
      senderRef = Some(sender())
      val uri: String = "localhost:8080/" + sessionId
      val request = HttpRequest(HttpMethods.GET, uri)
      val responseFuture = Http(context.system).singleRequest(request)
      responseFuture
        .onComplete {
          case Success(res)
          Unmarshal(res.entity.toStrict(40 seconds)).value.map { result =>
              val responseStr = result.data.utf8String
              log.info("Actor LongPollRequestActor: long poll responseStr {}",responseStr)
              senderRef match {
                case Some(ref) =>
                  ref ! responseStr
                case None => log.info("Actor LongPollRequestActor: sender ref is null")
              }
            }
          case Failure(e) =>log.error(e)
          }
          }
          }

final case class JanusLongPollRequest(sessionId: BigInt)
class JanusManagerActor(childMaker: List[ActorRefFactory => ActorRef]) extends Actor {
  var senderRef: Option[akka.actor.ActorRef] = None
  val longPollRequestActor  = childMaker(1)(context)
  def receive: PartialFunction[Any, Unit] = {
    case JanusLongPollRequest(sessionId)=>
      senderRef = Some(sender)
      keepAlive(sessionId,senderRef)
}

    def keepAlive(sessionId:BigInt,sender__Ref: Option[ActorRef]):Unit= {
        val senderRef = sender__Ref
        val future = ask(longPollRequestActor, CreateLongPollRequest(sessionId)).mapTo[String] //.pipeTo(sender)
    if (janus.equals("keepalive")) {
                val janusRequestResponse = Future {
                  JanusSessionRequestResponse(janus = janus)
                }
                senderRef match {
                  case Some(sender_ref) =>
                    janusRequestResponse.pipeTo(sender_ref)
                }
                keepAlive(sessionId,senderRef)
              }
              else if (janus.equals("event")) {
               //some fetching of values from server 
                val janusLongPollRequestResponse = Future {
                  JanusLongPollRequestResponse(janus = janus,sender=sender, transaction=transaction,pluginData=Some(pluginData))
                }
                senderRef match {
                  case Some(sender_ref) =>
                    janusLongPollRequestResponse.pipeTo(sender_ref)
                }
                keepAlive(sessionId,senderRef)
              }

def createLongPollRequest: server.Route =
    path("create-long-poll-request") {
      post {
entity(as[JsValue]) {
          json =>
            val sessionID = json.asJsObject.fields("sessionID").convertTo[String]

          val future = ask(janusManagerActor, JanusLongPollRequest(sessionID)).mapTo[JanusSessionRequestResponse]
            onComplete(future) {
              case Success(sessionDetails) =>
                    log.info("janus long poll request created")
                    val jsonResponse = JsObject("longpollDetails" -> sessionDetails.toJson)
                    complete(OK, routeResponseMessage.getResponse(StatusCodes.OK.intValue, ServerMessages.JANUS_SESSION_CREATED, jsonResponse))

              case Failure(ex) =>
                failWith(ex)
            }

           }
          }

现在上面的路由createLongPollRequest第一次运行良好,我可以看到响应,在接下来的尝试中,我得到了一个死信,如下所示

[INFO] [akkaDeadLetter][07/30/2021 12:13:53.587] [demo-Janus-ActorSystem-akka.actor.default-dispatcher-6] [akka://demo-Janus-ActorSystem/deadLetters] Message [com.ifkaar.lufz.janus.models.janus.JanusSessionRequestResponse] from Actor[akka://demo-Janus-ActorSystem/user/ActorManager/ManagerActor#-721316187] to Actor[akka://demo-Janus-ActorSystem/deadLetters] was not delivered. [4] dead letters encountered. If this is not an expected behavior then Actor[akka://demo-Janus-ActorSystem/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

这可能是在第一次迭代responseFuture.pipeTo(sender()后导致问题的原因
当我的后端服务器响应时,有没有一种方法可以在我的akkahttp路由中得到响应?

z0qdvdin

z0qdvdin1#

Actor应该只回复CreateLongPollRequest一次,并且应该只在它有有效数据时才回复。如果轮询失败,Actor应该发出另一个轮询请求。
如果没有演员的细节,就很难给予更多的帮助。

相关问题