Akka HTTP:在将来阻止会阻止服务器

3j86kqsm  于 2022-11-05  发布在  其他
关注(0)|答案(2)|浏览(185)

我正在尝试使用Akka HTTP对我的请求进行基本身份验证。碰巧我有一个外部资源需要通过它进行身份验证,所以我必须对这个资源进行一个rest调用。
这需要一些时间,在处理过程中,我的API的其余部分似乎被阻塞,等待这个调用。我用一个非常简单的例子重现了这一点:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()

val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我向日志端点发布,我的get端点也会等待5秒,这是日志端点规定的。
这是预期的行为吗?如果是,我如何在不阻塞整个API的情况下进行阻塞操作?

6ojccjat

6ojccjat1#

你所观察到的是预期的行为--当然这是非常糟糕的。好在存在已知的解决方案和最佳实践来防范它。在这个答案中,我想花一些时间来解释这个问题,简短、详细,然后深入--享受阅读吧!

简短回答:“* 不要阻塞路由基础结构!*",请始终使用专用调度程序进行阻塞操作!
**观察到的症状的原因:**问题是您使用context.dispatcher作为阻塞future执行的调度程序。路由基础设施使用相同的调度程序(简单地说就是“一堆线程”)来实际处理传入的请求--因此,如果您阻塞所有可用的线程,您最终会使路由基础设施挨饿。(一个有待讨论和基准测试的事情是,Akka HTTP是否可以防止这种情况,我会将其添加到我的研究待办事项列表中)。

必须特别小心地处理阻塞,以免影响同一个调度程序的其他用户(这就是为什么我们可以如此简单地将执行分离到不同的调度程序上),如Akka文档部分所述:阻塞需要仔细管理。
我想在这里引起注意的另一件事是,应该 * 尽可能避免阻塞API * --如果您的长时间运行操作实际上不是一个操作,而是一系列操作,您可以将它们分离到不同的参与者,或者按顺序进行后续操作。无论如何,我只想指出--如果可能,避免此类阻塞调用,然而,如果你不得不-那么下面解释了如何正确地处理这些。

深入分析和解决方案

现在我们知道了什么是错误的,从概念上讲,让我们看看上面的代码中到底是什么被破坏了,以及这个问题的正确解决方案是什么样子的:
颜色=线程状态:

  • 绿松石色-睡眠
  • 橙子-等待中
  • 绿色-可运行

现在,让我们研究3段代码,以及它们如何影响调度程序和应用程序的性能。为了强制执行此行为,应用程序已置于以下负载下:

  • [a]继续请求GET请求(请参见上面第一个问题中的代码),它不会在那里阻塞
  • [B]然后,过一段时间后,引发2000个POST请求,这将导致在返回未来的POST请求之前阻塞5秒
    1) [bad]错误代码上的调度程序行为
// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

因此,我们将我们的应用暴露给[a]负载,您可以看到许多akka.actor.default-dispatcher线程-它们正在处理请求-绿色的小片段,橙子表示其他线程实际上处于空闲状态。

然后我们启动[B] load,这会导致这些线程阻塞-您可以看到早期线程“default-dispatcher-2,3,4”在之前空闲后进入阻塞状态。我们还观察到池在增长-新线程被启动“default-dispatcher-18,19,20,21...”,然而它们立即进入休眠状态(!)-我们在这里浪费了宝贵的资源!
这样启动的线程的数量取决于默认的调度器配置,但可能不会超过50个左右。由于我们刚刚触发了2k个阻塞操作,我们使整个线程池饥饿-阻塞操作占主导地位,以至于路由基础结构没有可用的线程来处理其他请求-非常糟糕!
让我们对此做些什么(顺便提一下,这是Akka的最佳实践-总是隔离阻塞行为,如下所示):

2) [good!]调度程序行为良好的结构化代码/调度程序

在您的application.conf中,配置专用于阻塞行为的此调度程序:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

你应该阅读更多的Akka Dispatchers文档,以了解这里的各种选项。但主要的一点是,我们选择了一个ThreadPoolExecutor,它有一个硬限制的线程,它保持可用于阻塞操作。大小设置取决于你的应用做什么,以及你的服务器有多少核心。
接下来,我们需要使用它,而不是默认值:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

我们使用相同的负载向应用施加压力,首先是一些普通请求,然后添加阻塞请求。这是线程池在这种情况下的行为:

因此,最初的正常请求很容易由默认调度程序处理,您可以在那里看到几条绿色线-这是实际的执行(我并没有真正将服务器置于重负载下,因此它大多数时间是空闲的)。
现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-*会启动,并启动到配置的线程数。它会处理其中的所有休眠线程。此外,在这些线程上没有任何操作发生的一段时间后,它会关闭它们。如果我们用另一组阻塞操作攻击服务器,池将启动新的线程来处理休眠()-消化他们,但与此同时-我们没有浪费我们宝贵的线程上“只是呆在那里,什么也不做”。
当使用这种设置时,正常GET请求的吞吐量不会受到影响,它们仍然可以很好地在(仍然相当空闲的)默认调度程序上得到服务。

  • 这是处理React式应用程序中任何类型的阻塞的推荐方法。它通常被称为“隔离”(或“隔离”)应用程序的不良行为部分,在这种情况下,不良行为是休眠/阻塞。*
    正确应用blocking时的3) [workaround-ish]调度程序行为

在本例中,我们使用scaladoc for scala.concurrent.blocking方法,该方法可以在遇到阻塞操作时提供帮助。它通常会使更多线程加速旋转,以便在阻塞操作中幸存下来。

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

应用程序的行为如下:

您会注意到,* 大量 * 新线程被创建,这是因为阻塞提示“哦,这会阻塞,所以我们需要更多线程”。这会导致阻塞的总时间比1)示例中的要短,但是,在阻塞操作完成后,我们有数百个线程什么也不做...当然,它们最终会被关闭(FJP可以做到这一点),但在一段时间内,我们将有大量(不受控制的)线程在运行,这与2)解决方案形成了鲜明对比,在2)解决方案中,我们确切地知道有多少线程专门用于阻塞行为。

总结:从不阻止默认调度程序:-)

最佳实践是使用**2)**中所示的模式,以便为阻塞操作提供一个调度程序,并在那里执行这些操作。

讨论的Akka HTTP版本2.0.1
**使用的性能分析器:**许多人私下问我,我使用了什么性能分析器来显示上图中的线程状态,因此在此添加以下信息:我使用了YourKit,这是一个很棒的商业分析器(对OSS免费),尽管使用免费的VisualVM from OpenJDK也可以达到同样的效果。

kgsdhlau

kgsdhlau2#

奇怪,但对我来说一切都很好(没有阻塞)。下面是代码:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future

object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

您也可以将异步代码封装到onCompleteonSuccess指令中:

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}

相关问题