我正在尝试修复Springboot RabbitMQ AMQP场景中的一个棘手问题。
1.我有多个并发设置为2的队列:
1.应用程序正常启动和使用消息。我运行了这个应用程序的3个示例,所以我通常有6个使用者运行使用消息。
1.过了一段时间(可能是几天),我可以在日志中看到抛出了一个异常,如Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)
。
1.这会关闭消费者线程,AMQP不会重新创建它,这个消费者就消失了。这会发生,直到我没有更多的消费者,然后队列继续接收消息,但没有消费者可以消费它们。
相关信息:
1.这是一个长时间处理的场景,这意味着我使用RabbitMQ来编排ETL工作负载,因此一些消息需要很长时间。我在这里的其他答案中看到,这可能是因为处理消息花费的时间太长,然后超时,所以我改变了消费者来执行手动ACK:
@Component
@Slf4j
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${rabbitmq.queue.name}", durable = "true"),
exchange = @Exchange(value = "${rabbitmq.exchange.name}"),
key = "${rabbitmq.queue.name}"
),
concurrency = "${rabbitmq.queue.concurrency}",
ackMode = "MANUAL")
public class ExampleConsumer {
@RabbitHandler
@CircuitBreaker(name = "CONSUMER_CB", fallbackMethod = "fallback")
@SneakyThrows
public void extract(SomeMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
channel.basicAck(tag, Boolean.FALSE);
handleMessage(message);
}
}
字符串
您可以考虑handleMessage(message)
方法需要很长时间来处理。
1.我看到Garry Russel的answer说:“到目前为止,这类问题最常见的原因是容器线程在用户代码中的某个地方被卡住了-要么在侦听器中,要么在侦听器调用的代码中;例如死锁。
第一步是在下次碰巧看到侦听器容器线程正在做什么时进行线程转储。”
然而,由于这只发生在生产代码中,而且很难知道它何时会发生,所以我不能轻松地获得线程转储。
另外,考虑到我正在进行手动ACK,我是否错误地认为这个错误永远不会发生?因为从表面上看,只有当消费者花了太长时间处理消息并且没有ACK它时,才会发生这种情况,而这里不是这种情况。
我的问题是
- 为什么手动ACK不能解决这个问题?
- 我可以做些什么来让AMQP自动重新创建消费者?
- 如果我不能自动地这样做,那么我如何创建一个监视程序来检查队列的状态,并在队列没有消费者的情况下重新创建消费者呢?
- 有没有什么方法可以让我理解是什么错误导致了这个错误?也许我可以实现一个侦听器来获取内部异常的细节?
1条答案
按热度按时间2hh7jdfx1#
您可以在这里使用以下方法,
列表项:
按照以下步骤创建一个API,
**第一步:**创建Get端点
**第二步:**在Enum或Env中定义所有队列及其所需消费者计数
**第三步:**调用RabbitMQ API
curl -i -u guest:guest http://localhost:15672/api/queues
获取所有队列细节**第4步:**循环遍历每个队列,并检查消费者是否计数
**步骤5:**如果消费者计数较少,则创建缺少的消费者线程数
**第六步:**返回响应
在检查消费者计数时,请确保您也在检查虚拟主机。因为同一队列也可能具有不同的虚拟主机。如果您只有一个虚拟主机,请忽略。
设置CronJob:
在GCP或AWS或任何你想要的地方创建一个cronjob,并以两分钟的间隔调用这个API。如果你不熟悉cronjob,那么你可以在线检查。