我是一个初学者,我正在学习曼宁的 Camel 行动。我正在按照第9章中的规定设置事务管理器:
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://${activemq.host}:${activemq.port}" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="2" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsTxnManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="pooledConnectionFactory" />
</bean>
<bean id="activemq" class="org.apache.camel.component.activemq.ActiveMQComponent">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="transacted" value="true" />
<property name="transactionManager" ref="jmsTxnManager"/>
</bean>
我有一个简单的消费者路由设置:
<route id="exAroute">
<from uri="activemq:{{mq.in}}"/>
<transacted/>
<log message="${body}"/>
</route>
当我运行Camel时,我在一个空队列上每秒都会得到一组DEBUG日志消息:
10:54:30.873 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Creating new transaction with name [JmsConsumer[TEST_IN]]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
10:54:30.873 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Created JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60122-1666796067926-1:2:1,started=true} java.lang.Object@192cac46 }] from Connection [PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:computer-60122-1666796067926-1:2,clientId=ID:computer-60122-1666796067926-0:1,started=true}] }]
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:computer-60122-1666796067926-1:2:1:2, lastDeliveredSequenceId: -1
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60122-1666796067926-1:2:1,started=true} java.lang.Object@192cac46 }]
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60122-1666796067926-1:2:1 Transaction Commit :null
10:54:31.885 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60122-1666796067926-1:2:1 Transaction Rollback, txid:null
我想知道这是为什么?我的设置不正确吗?
我已经花了2个小时研究这个没有任何运气。我希望你能帮助。提前感谢。
PS:如果我把一条消息放在队列中等待使用,路由会执行得很好:
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.TransactionContext - Begin:TX:ID:computer-60151-1666796352835-1:2:1
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.component.jms.DefaultJmsMessageListenerContainer - Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] from consumer [PooledMessageConsumer { ActiveMQMessageConsumer { value=ID:computer-60151-1666796352835-1:2:1:5, started=true } }] of transactional session [PooledSession { ActiveMQSession {id=ID:computer-60151-1666796352835-1:2:1,started=true} java.lang.Object@296def58 }]
10:59:21.829 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.component.jms.EndpointMessageListener - activemq://TEST_IN consumer received JMS message: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:computer-60156-1666796361583-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:computer-60156-1666796361583-1:1:1:1, destination = queue://TEST_IN, transactionId = null, expiration = 0, timestamp = 1666796361809, arrival = 0, brokerInTime = 1666796361809, brokerOutTime = 1666796361819, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4db30c0e, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = test}
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.spring.spi.TransactionErrorHandler - Transaction begin (0x76adb233) redelivered(false) for (MessageId: ID:computer-60156-1666796361583-1:1:1:1:1 on ExchangeId: ID-computer-1666796353110-0-1))
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Participating in existing transaction
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] INFO exAroute - test
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.camel.spring.spi.TransactionErrorHandler - Transaction commit (0x76adb233) redelivered(false) for (MessageId: ID:computer-60156-1666796361583-1:1:1:1:1 on ExchangeId: ID-computer-1666796353110-0-1))
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Initiating transaction commit
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.springframework.jms.connection.JmsTransactionManager - Committing JMS transaction on Session [PooledSession { ActiveMQSession {id=ID:computer-60151-1666796352835-1:2:1,started=true} java.lang.Object@296def58 }]
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60151-1666796352835-1:2:1 Transaction Commit :TX:ID:computer-60151-1666796352835-1:2:1
10:59:21.859 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.TransactionContext - Commit: TX:ID:computer-60151-1666796352835-1:2:1 syncCount: 2
10:59:21.869 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:computer-60151-1666796352835-1:2:1:5, lastDeliveredSequenceId: 844
10:59:21.869 [Camel thread #2 - JmsConsumer[TEST_IN]] DEBUG org.apache.activemq.ActiveMQSession - ID:computer-60151-1666796352835-1:2:1 Transaction Rollback, txid:null
1条答案
按热度按时间6ss1mwsb1#
Apache Camel使用Spring JMS模板,Spring的JMS模板在事务模式下的每个消息接收检查循环之后关闭会话和连接。
这会在ActiveMQ代理上造成大量系统颠簸。
修复:将此属性添加到您的o.a.camel.component.activemq.ActiveMQComponent bean中
参考:https://camel.apache.org/components/3.18.x/jms-component.html