看起来我还没有完全理解XA事务是如何工作的。我以为它是原子的:我认为当我提交一个事务时,新消息和新数据将同时可用。
这种误解使我产生了以下问题:新的行被插入到数据库中,消息被发送到事务路由中的队列。在另一个路由中,消息被接收。然后,这个路由尝试对在前一个路由中插入的行执行一些操作。但是它看不到它们!
第二个路由被配置为在发生异常时将消息回滚到队列中。我看到在第二次运行后,路由看到了行!
作为结论,我想提出以下几个问题:
- XA事务真的是原子事务吗?
1.如果不是,我如何配置事务资源的提交顺序?
**附加说明:**该问题在Fuse ESB/ServiceMix 4.4.1中发现
**2 Jake:**我的camel上下文配置如下所示:
<osgi:reference id="osgiPlatformTransactionManager" interface="org.springframework.transaction.PlatformTransactionManager"/>
<osgi:reference id="osgiJtaTransactionManager" interface="javax.transaction.TransactionManager"/>
<osgi:reference id="myDataSource"
interface="javax.sql.DataSource"
filter="(osgi.jndi.service.name=jdbc/postgresXADB)"/>
<bean id="PROPAGATION_MANDATORY" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="osgiPlatformTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_MANDATORY"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="osgiPlatformTransactionManager"/>
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/>
</bean>
<bean id="jmstx" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsTxConfig" />
</bean>
<bean id="jmsTxConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsXaPoolConnectionFactory"/>
<property name="transactionManager" ref="osgiPlatformTransactionManager"/>
<property name="transacted" value="false"/>
<property name="cacheLevelName" value="CACHE_NONE"/>
<property name="concurrentConsumers" value="${jms.concurrentConsumers}" />
</bean>
<bean id="jmsXaPoolConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory">
<property name="maxConnections" value="${jms.maxConnections}" />
<property name="connectionFactory" ref="jmsXaConnectionFactory" />
<property name="transactionManager" ref="osgiJtaTransactionManager" />
</bean>
<bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}"/>
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="-1"/>
<property name="initialRedeliveryDelay" value="2000" />
<property name="redeliveryDelay" value="5000" />
</bean>
</property>
</bean>
DB数据源配置如下:
<bean id="myDataSource" class="org.postgresql.xa.PGXADataSource">
<property name="serverName" value="${db.host}"/>
<property name="databaseName" value="${db.name}"/>
<property name="portNumber" value="${db.port}"/>
<property name="user" value="${db.user}"/>
<property name="password" value="${db.password}"/>
</bean>
<service ref="myDataSource" interface="javax.sql.XADataSource">
<service-properties>
<entry key="osgi.jndi.service.name" value="jdbc/postgresXADB"/>
<entry key="datasource" value="postgresXADB"/>
</service-properties>
</service>
2条答案
按热度按时间tgabmvqs1#
我不是这方面的Maven,但我的观点是XA提供的原子性只保证:
我不认为任何关于单个参与者在同一时刻完成的保证,也没有任何类型的“提交依赖树”来保证后续处理只发生在已经提交的参与者身上。
我认为要实现您想要的,您可能需要将消息队列放在主事务之外......这首先破坏了事务的全部意义:(
我认为您可能只需要在下游处理中放置一个重试/超时循环。另一种方法可能是探索并发选项,看看是否允许下游事务“看到"上游事务。
希望这个答案能促使对这方面更了解的人加入进来!
qv7cva1a2#
对于XA事务,当您提交时,camel将为每个DB运行
xa commit
,确保所有XA事务最终都会被提交,而不是同时提交。由于每个DB中的数据是单独提交的,因此它不是原子的,并且不可能跨原子数据库进行数据修改。对于您的应用程序,可能有两种选择来避免该问题。
1.不要使用XA事务,而是使用OutBox模式。您可以更新部分数据,将消息发送到队列,然后返回。任何依赖于订单的操作都被放入队列,在那里您可以轻松地自定义订单。
1.基于您的XA解决方案,当您想读取最新的数据时,您调用
select for update
,它将等待未完成的XA事务持有的行锁,当XA事务返回时,select for update
将返回最新的数据。您的
AMQ_SCHEDULED_DELAY
标头是一种变通方法,但在发生异常时不起作用。