我有以下基于HTTP的应用程序,它将每个请求路由到一个Akka Actor,该Actor使用一个长链的Akka Actor来处理请求。
path("process-request") {
post {
val startedAtAsNano = System.nanoTime()
NonFunctionalMetrics.requestsCounter.inc()
NonFunctionalMetrics.requestsGauge.inc()
entity(as[Request]) { request =>
onComplete(distributor ? [Response](replyTo => Request(request, replyTo))) {
case Success(response) =>
NonFunctionalMetrics.requestsGauge.dec()
NonFunctionalMetrics.responseHistogram.labels(HttpResponseStatus.OK.getCode.toString).observeAsMicroseconds(startedAtAsNano, System.nanoTime())
complete(response)
case Failure(ex) =>
NonFunctionalMetrics.requestsGauge.dec()
NonFunctionalMetrics.responseHistogram.labels(HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode.toString).observeAsMicroseconds(startedAtAsNano, System.nanoTime())
logger.warn(s"A general error occurred for request: $request, ex: ${ex.getMessage}")
complete(InternalServerError, s"A general error occurred: ${ex.getMessage}")
}
}
}
}
正如您所看到的,我向distributor
发送了一个ask
请求以获得响应。
问题在于,在RPS非常高的情况下,distributor
有时会失败,并出现以下异常:
2022-04-16 00:36:26.498 WARN c.d.p.b.http.AkkaHttpServer - A general error occurred for request: Request(None,0,None,Some(EntitiesDataRequest(10606082,0,-1,818052,false))) with ex: Ask timed out on [Actor[akka://MyApp/user/response-aggregator-pool#1374579366]] after [5000 ms]. Message of type [com.dv.phoenix.common.pool.WorkerPool$Request]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
这是一个典型的非信息性异常,正常处理时间大约为700
微秒5
秒。由于处理时间不可能太长,因此它必须停留在管道的某个位置。
我想监控这一点,我考虑添加Kamon
集成,为Akka Actors
模块提供邮箱等。
我尝试添加以下配置,但不起作用:https://kamon.io/docs/latest/instrumentation/akka/ask-pattern-timeout-warning/(未显示任何效果)
是否有其他建议来了解高RPS系统上此问题的原因?
谢谢你!
1条答案
按热度按时间4xy9mtcn1#
Kamon工具对于你如何找到卖出价很有用,如果你有很多地方卖出价可能超时,它会很有用,但否则它不太可能告诉你问题所在。
这是因为询问超时几乎总是一些其他问题的症状(唯一的例外是如果许多询问可能在流中(例如在
mapAsync
或ask
阶段中)合理地完成但没有完成;假设超时不是由数据库关闭导致得不到回复或集群失败(这两种情况都是显而易见得,因此我得假设是)引起得,那么超时(通常是任何超时)得原因通常是队列中有太多元素(“饱和”).我们将从
distributor
开始,它是一个参与者,一次处理来自其邮箱的一条消息(这是一个队列)。当您说正常处理时间是700微秒时,这是衡量分发服务器处理一个请求所花费的时间(即,在它可以处理下一个请求之前的时间)?如果是这样,并且distributor
占用700微秒,但每600微秒就有一个请求,则可能发生以下情况:distributor
(邮箱深度0)distributor
的邮箱中排队(邮箱深度1)等等:延迟和深度无限制地增加。最终,深度是这样的,请求在邮箱中花费5秒钟(甚至更多)。
Kamon能够跟踪参与者邮箱中的邮件数量(建议仅对特定参与者执行此操作)。在本例中,跟踪
distributor
的邮箱深度将显示其增长,但无法确认是否发生了这种情况。如果
distributor
的邮箱队列太长,首先考虑请求N如何影响请求N + 1。只有在对请求的响应可能受到其前一个请求的影响时,才严格需要执行元的一次一个处理模型。如果请求仅涉及系统整体状态的某一部分,则该请求可以与不涉及该部分的任何部分的请求并行处理。如果存在总体状态的不同部分,使得没有请求涉及2个或更多部分,则可以将状态的每个部分的责任卸载给特定行动者,并且分发者仅查看每个请求足够长的时间以确定将请求转发到哪个行动者(注意,这通常将不需要分发者作出询问:这基本上就是Cluster Sharding在幕后所做的事情,而且值得注意的是,这样做可能会增加低负载下的延迟(因为您要做更多的工作),但会增加峰值吞吐量,最多增加状态部分的数量。如果这不是解决分销商邮箱饱和问题的可行方法(也就是说,没有好的方法来划分状态),那么您至少可以通过在请求消息中包含“respond-by”字段来限制请求在邮箱中花费的时间(例如对于5秒的询问超时,您可能需要4900米利斯之前的响应。当分发服务器开始处理消息并且响应时间已过时,它移动到下一个请求:有效地这样做意味着当邮箱开始饱和时,消息处理速率增加。
当然,有可能分发服务器的邮箱不是即将饱和的队列,或者即使是,也不是因为参与者花费了太多时间处理消息,而是分发服务器(或响应所需的其他参与者)没有处理消息。
执行元在调度程序中运行,调度程序能够拥有一定数量的执行元如果在它们各自的邮箱中具有消息的行动者比可以处理消息的数目更多,则可以将它们视为在给定时间处理消息的行动者(或
Future
回调或其它任务,它们中的每一个可以被视为等效于为处理单个消息而产生的行动者)。那些参与者在队列中等待调度(注意,即使您碰巧具有将产生处理消息所需的尽可能多的线程的调度器,这也适用:由于CPU内核的数量有限,OS内核调度程序的队列将充当调度程序队列的角色)。Kamon可以跟踪此队列的深度。根据我的经验,检测正在发生的线程饥饿(基本上是任务提交和任务开始执行之间的时间是否超过某个阈值)更有价值。Lightbend的商业工具包可与Akka一起使用(免责声明:我受雇于Lightbend)提供了一些工具,用于以最小的开销检测是否正在发生饥饿,并提供其他诊断信息。如果观察到线程资源不足,以及垃圾收集暂停或CPU限制等情况(例如,由于在容器中运行)被排除,饥饿的主要原因是(或类似于执行元的东西)处理消息的时间太长,这可能是因为它们正在执行阻塞I/O或在处理单个消息时做了太多工作。如果阻塞I/O是问题的原因,尝试将I/O移动到在线程池中运行的actors或future,该线程池中的线程数远远超过CPU内核数(有些人甚至主张使用无边界线程池来实现此目的)。如果在处理单个消息时需要进行过多的计算,在处理过程中寻找合适的点,在这些点上捕获消息中剩余计算所需的状态,并将该消息发送给您自己(这基本上等同于一个协程让步)。