我试图发送几百个来自akka演员的http请求,但我得到
akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [16]. This means that the request queue of this pool (HostConnectionPoolSetup(places.api.here.com,443,ConnectionPoolSetup(ConnectionPoolSettings(16,1,5,16,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.0)...
这是应用程序.conf
http {
host-connection-pool {
max-connections = 16
min-connections = 1
max-open-requests = 16
}
}
这是代码
override def receive: Receive = {
case Foo(_) =>
val res: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
// do something for the result
我试着用状态来控制
override def receive: Receive = run(0)
def run(openRequests: Int) : Receive = {
case Foo(_) if openRequests <= 16 =>
context.become(run(openRequests + 1))
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = "http://..."))
responseFuture.foreach(context.become(run(openRequests - 1)))
//...
无论哪种方式,我都得到了BufferOverflowException
的相同异常
如有任何建议,我们将不胜感激
1条答案
按热度按时间9wbgstp71#
在
Future
内异步使用context
是个坏主意。context
它仅在调用参与者期间有效。bug是
context.become(run(openRequests - 1))
使用的是创建Future
时openRequests
的值,而不是调用它时的值。因此,当第一个请求完成时,它将调用context.become(run(-1))
(这显然是假的),即使可能有15个未完成的请求。解决方案是在
foreach
中向您自己发送一条私有消息,而不是直接调用context.become
。当参与者处理该消息时,它会减少 current 请求计数,并在必要时发送一个新请求。