Apache Camel阻止多播

3vpjnl9f  于 2022-12-26  发布在  Apache
关注(0)|答案(1)|浏览(158)

我有一个应用程序,它由部署在几个JBoss示例中的许多模块组成,这些模块通过JMS相互广播它们的版本号。
它的工作方式如下,每个模块定期广播自己的版本,如下所示:

camelTemplate.asyncSendBody(moduleVersionsOutputChannel, new ArrayList<>(moduleVersions));

下面是moduleVersionsOutputChannel的定义:

<endpoint id="moduleVersionsOutputChannel" uri="direct:module.versions.output.channel"/>

然后有一个路由,它使用来自moduleVersionsOutputChannel(下面称为source)的消息,并将消息发布到主题中:

from(source).multicast()
        .parallelProcessing().timeout(moduleVersionsMonitoringConfig.getMulticastTimeout())
        .to(moduleVersionsMonitoringConfig.getChannels())
        .id(moduleVersionsService.getCurrentModuleId() + "-outbound");

一开始一切正常,但是过了一段时间(几小时或几天),执行多播的线程被阻塞了,我在两个不同的点观察到阻塞。
第一个块发生在调用asyncSendBody的线程中:

"moduleVersionsBroadcastScheduler-1@57542" prio=5 tid=0x423 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
      at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
      at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:328)
      at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:214)
      at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
      at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
      at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
      at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
      at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
      at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
      at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:73)
      at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:378)
      at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:346)
      at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:242)
      at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:346)
      at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
      at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
      at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
      at org.apache.camel.impl.DefaultProducerTemplate$14.call(DefaultProducerTemplate.java:621)
      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:2025)
      at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
      at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
      at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
      at org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:626)
      at ...

还有一次,执行在稍后实际尝试向JMS主题发布消息时阻塞:

"Camel (moduleVersionsContext) thread #1044 - Multicast@52031" daemon prio=5 tid=0xa16 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
      at java.util.concurrent.Semaphore.acquire(Semaphore.java:472)
      at org.hornetq.core.client.impl.ClientProducerCreditsImpl.acquireCredits(ClientProducerCreditsImpl.java:90)
      at org.hornetq.core.client.impl.ClientProducerImpl.sendRegularMessage(ClientProducerImpl.java:307)
      at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:288)
      at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:140)
      at org.hornetq.jms.client.HornetQMessageProducer.doSend(HornetQMessageProducer.java:438)
      at org.hornetq.jms.client.HornetQMessageProducer.send(HornetQMessageProducer.java:205)
      at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:589)
      at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:336)
      at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:275)
      at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:217)
      at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:231)
      at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
      at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
      at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
      at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
      at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
      at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)
      at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
      at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
      at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
      at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)
      at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
      at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
      at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:712)
      at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
      at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:293)
      at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:278)
      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:744)

你能告诉我我的Camel配置中是否有错误,或者这是一个已知的Camel问题,有一些解决办法吗?
我的环境:

  • Apache Camel 2.12.3
  • JBoss 6.1企业应用程序
  • 开放式JDK 1.7.0_55
  • Ubuntu操作系统12.04 LTS

相关问题