akka 执行元在发送Single时获取BufferOverflowException

0lvr5msh  于 2022-11-23  发布在  其他
关注(0)|答案(1)|浏览(128)

我试图发送几百个来自akka演员的http请求,但我得到

akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(places.api.here.com,443,ConnectionPoolSetup(ConnectionPoolSettings(16,1,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.0)...

这是应用程序.conf

http {
          host-connection-pool {
            max-connections = 16
            min-connections = 1
            max-open-requests = 16
          }
        }

这是代码

override def receive: Receive = {
      case Foo(_) => 
       val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
   // do something for the result

我试着用状态来控制

override def receive: Receive = run(0)
def run(openRequests: Int) : Receive = {
  case Foo(_) if openRequests <= 16 => 
     context.become(run(openRequests + 1))
       val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
       responseFuture.foreach(context.become(run(openRequests - 1)))
        //...

无论哪种方式,我都得到了BufferOverflowException的相同异常
如有任何建议,我们将不胜感激

9wbgstp7

9wbgstp71#

Future内异步使用context是个坏主意。context它仅在调用参与者期间有效。
bug是context.become(run(openRequests - 1))使用的是创建FutureopenRequests的值,而不是调用它时的值。因此,当第一个请求完成时,它将调用context.become(run(-1))(这显然是假的),即使可能有15个未完成的请求。
解决方案是在foreach中向您自己发送一条私有消息,而不是直接调用context.become。当参与者处理该消息时,它会减少 current 请求计数,并在必要时发送一个新请求。

相关问题