我有一个3节点的rabbitmq集群,其中一个节点有一个持久的非镜像经典队列,名为test-queue
。
我有一个spring boot应用程序,它使用spring-AMQP默认连接工厂new CachingConnectionFactory()
来首先确保队列存在,然后订阅它的消息。
然后,我开始对rabbitmq集群进行滚动更新,其中的节点一个接一个地重新启动。
在此过程中,我从日志中观察到以下内容:
启动后,我看到以下输出
Received shutdown signal for consumer tag=amq.ctag-pzPHM_GEd5e-J5Y_L2W7_g com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
...
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Attempting to connect to: xxx:5672
...
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Created new connection: xxx#66971f6b:58/SimpleConnection@4315e774
这表明应用程序收到了关闭信号并成功重新连接。此时,看起来具有队列的节点已关闭,但应用程序能够建立新连接,因为存在其他节点
后来,我看到更多的关闭信号,这表明其他节点开始关闭
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Consumer raised exception, processing can restart if the connection factory supports it com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced
同时,我注意到下面的日志,它表明虽然连接,spring amqp找不到队列。我猜这是因为节点有队列关闭。Spring amqp可能正在检查其他节点。它认为队列不存在,所以开始重新创建队列。还注意到有一个重试限制,它是3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer[m][] - Failed to declare queue: test-queuey
Queue declaration failed; retries left=3 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[test-queue]
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - queue 'test-queue' in vhost '/' process is stopped by supervisor, class-id=50, method-id=10)
最后,重试次数耗尽。我注意到以下情况。看起来Spring amqp放弃了,并开始关闭所有内容。最终状态是,没有消费者注册到队列。Spring应用程序仍在运行,但无法获取消息。它不再像处理断开连接时那样重试。解决方法是重新启动应用程序。
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Cancelling Consumer@7f74d6dd: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@xxx:5672/,26), conn: Proxy@65ef722a Shared Rabbit Connection: SimpleConnection@4315e774 [delegate=amqp://guest@xxx:5672/, localPort= 37208], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer[m][] - Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@xxx:5672/,26), conn: Proxy@65ef722a Shared Rabbit Connection: SimpleConnection@4315e774 [delegate=amqp://guest@xxx:5672/, localPort= 37208]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory[m][] - Closing cached Channel: AMQChannel
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer[m][] - Shutting down Rabbit listener container
我得到了springamqp附带的一个重试断开连接的逻辑,如何使Spring等待群集重新启动完成,然后开始重新连接?或者,是否有一种方法可以禁用队列检查重试限制,以便它将继续检查队列,直到群集重新启动完成,而不是提前放弃?将队列更改为镜像队列或仲裁队列是否可以解决此问题?
1条答案
按热度按时间gajydyqb1#
请参阅
https://docs.spring.io/spring-amqp/docs/current/reference/html/#declarationRetries
被动队列声明失败时的重试次数。被动队列声明发生在使用者启动时,或从多个队列使用时,或在初始化过程中并非所有队列都可用时。如果在重试次数用完后没有配置的队列可以被动声明(出于任何原因),则容器行为由前面所述的“missingQueuesFatal”属性控制。
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#failedDeclarationRetryInterval
被动队列声明重试尝试之间的间隔。被动队列声明发生在使用者启动时,或者在从多个队列使用时,在初始化期间并非所有队列都可用时。
您可以从默认值(分别为3和5000)中增加一个或两个值。