@jmslistener突然停止使用没有任何错误或异常的消息

qij5mzcb  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(679)

我有一个在生产环境中运行的应用程序,它使用activemq上的一个虚拟主题

@JmsListener(destination = "Consumer.example-consumer.VirtualTopic.${spring.activemq.example.topic}")
public void consumeTopic(String message) {...}

application.properties中的位置

spring.activemq.example.topic=example.topic

另外,我在spring boot-like中配置了活动mq

@Configuration
@EnableJms
public class AmazonMQConfiguration {
    @Value("${spring.activemq.broker-url}")
    public String brokerURL;

    @Value("${spring.activemq.user}")
    public String userName;

    @Value("${spring.activemq.password}")
    public String password;

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerURL);
        factory.setUserName(userName);
        factory.setPassword(password);
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        return new JmsTemplate(activeMQConnectionFactory());
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory());
        factory.setConcurrency("1-1");
        return factory;
    }
}

但昨天我突然 @JmsListener 停止使用来自虚拟主题的消息,在24小时内,我的使用者队列中有大约82000个挂起的消息。
请帮助我的问题,因为我不知道为什么这个问题发生,无法重新创建它。
下面我将进一步提供我的dependencies pom.xml代码和示例线程转储,当我了解到这个问题时,我使用了这个示例线程转储。
波姆。xml:-

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jersey</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web-services</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Exclude Spring Boot's Default Logging -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- Add Log4j2 Dependency -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

线程dump:-

2020-12-16 15:02:17
Full thread dump OpenJDK 64-Bit Server VM (25.265-b01 mixed mode):

"Attach Listener" #723 daemon prio=9 os_prio=0 tid=0x00007f77a8007800 nid=0x33b6 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ActiveMQ InactivityMonitor Worker" #55 daemon prio=5 os_prio=0 tid=0x00007f77a00bc000 nid=0x6571 waiting on condition [0x00007f7782ffd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000000855ef0a8> (a java.util.concurrent.SynchronousQueue$TransferStack)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-4" #54 daemon prio=5 os_prio=0 tid=0x00007f77b84fb000 nid=0x655c runnable [0x00007f778c5af000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-3" #53 daemon prio=5 os_prio=0 tid=0x00007f77b84f9800 nid=0x655b runnable [0x00007f778c6b0000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-2" #52 daemon prio=5 os_prio=0 tid=0x00007f77b84f7800 nid=0x655a runnable [0x00007f778c7b1000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"reactor-http-epoll-1" #51 daemon prio=5 os_prio=0 tid=0x00007f77b84f6800 nid=0x6559 runnable [0x00007f778c8b2000]
   java.lang.Thread.State: RUNNABLE
    at io.netty.channel.epoll.Native.epollWait(Native Method)
    at io.netty.channel.epoll.Native.epollWait(Native.java:148)
    at io.netty.channel.epoll.Native.epollWait(Native.java:141)
    at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

"DestroyJavaVM" #50 prio=5 os_prio=0 tid=0x00007f77dc04c800 nid=0x651e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #44 prio=5 os_prio=0 tid=0x00007f77a8009800 nid=0x6554 runnable [0x00007f778d2c6000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
    - locked <0x00000000855731a0> (a java.lang.Object)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    - locked <0x00000000855c6688> (a sun.security.ssl.AppInputStream)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
    at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)

"DefaultMessageListenerContainer-1" #42 prio=5 os_prio=0 tid=0x00007f77dd72c800 nid=0x6552 waiting on condition [0x00007f778d4c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008a8e04d0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
    at reactor.core.publisher.Mono.block(Mono.java:1680)
    at com.main.service.SingularConnectionService.sendDataToSingular(SingularConnectionService.java:38)
    at com.main.consumer.AmazonMQConsumer.consumeTopic(AmazonMQConsumer.java:62)
    at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

"ActiveMQ InactivityMonitor WriteCheckTimer" #40 daemon prio=5 os_prio=0 tid=0x00007f7798038000 nid=0x6550 in Object.wait() [0x00007f778d6ca000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.TimerThread.mainLoop(Timer.java:552)
    - locked <0x000000008556c420> (a java.util.TaskQueue)
    at java.util.TimerThread.run(Timer.java:505)

"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #38 prio=5 os_prio=0 tid=0x00007f7794571800 nid=0x654e runnable [0x00007f778d8cc000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
    - locked <0x00000000855d9288> (a java.lang.Object)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    - locked <0x00000000855da5e8> (a sun.security.ssl.AppInputStream)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
    at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
    at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)

"ActiveMQ InactivityMonitor ReadCheckTimer" #37 daemon prio=5 os_prio=0 tid=0x00007f7794210800 nid=0x654d in Object.wait() [0x00007f778d9cd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.util.TimerThread.mainLoop(Timer.java:552)
    - locked <0x00000000855eebe0> (a java.util.TaskQueue)
    at java.util.TimerThread.run(Timer.java:505)

"DefaultMessageListenerContainer-1" #36 prio=5 os_prio=0 tid=0x00007f77dd743000 nid=0x654c in Object.wait() [0x00007f778dcce000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
    - locked <0x00000000855eac88> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:418)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:303)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

"http-nio-8082-Acceptor" #34 daemon prio=5 os_prio=0 tid=0x00007f77dd827800 nid=0x654a runnable [0x00007f778ded0000]
   java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:419)
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:247)
    - locked <0x00000000853bcca0> (a java.lang.Object)
    at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:469)
    at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:71)
    at org.apache.tomcat.util.net.Acceptor.run(Acceptor.java:106)
    at java.lang.Thread.run(Thread.java:748)
e5nszbig

e5nszbig1#

单线程转储没有多大帮助,因为它只提供应用程序正在执行的操作的快照。您真正需要的是一系列线程转储,这样您就可以看到线程随着时间的推移在做什么。但是,由于您只提供了一个单线程转储,我想说您的问题是这个线程:

"DefaultMessageListenerContainer-1" #42 prio=5 os_prio=0 tid=0x00007f77dd72c800 nid=0x6552 waiting on condition [0x00007f778d4c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000008a8e04d0> (a java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
    at reactor.core.publisher.Mono.block(Mono.java:1680)
    at com.main.service.SingularConnectionService.sendDataToSingular(SingularConnectionService.java:38)
    at com.main.consumer.AmazonMQConsumer.consumeTopic(AmazonMQConsumer.java:62)
    at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

这是实际使用来自主题的消息的线程。请注意,在该方法中它被阻止:

com.main.service.SingularConnectionService.sendDataToSingular()

目前还不清楚这种阻塞会持续多久,但有可能这种阻塞永远不会清除,基本上会阻止您的消费者接收更多的消息。既然你用的是 setConcurrency("1-1") 您只有一个消费者,因此如果它被阻止,则不会再消费更多的消息。
您应该调查这个方法在做什么,并确保您调用任何带有超时的阻塞操作,这样线程就不能无限期地被阻塞。

相关问题