关于@transactional的springboot2.3.8.release实现的问题。需求是实现分布式事务,该事务写入postgresql和artemis队列的示例。如果一个提交失败,那么另一个也应该失败。我们使用atomikos作为jta事务管理器。
我想我已经实现了我需要的一切,但显然没有。当我在服务代码中抛出一个异常来测试回滚功能时,它显然不起作用:即使在我在服务代码中抛出一个异常之后,消息也会被写入artemis。
任何诊断和修复方面的帮助都将不胜感激。如果需要任何额外的细节,请告诉我。
具体实施情况如下:
Spring启动应用:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.artemis.ArtemisAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import xxx.xxx.Users;
import xxx.xxx.TransactionServiceImpl;
@SpringBootApplication
(
exclude = {
DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class,
JmsAutoConfiguration.class,
ActiveMQAutoConfiguration.class,
ArtemisAutoConfiguration.class
}
)
public class BApplication implements CommandLineRunner
{
public static void main(String[] args) throws Exception
{
// SpringApplication.run(BoilerplateApplication.class, args);
ConfigurableApplicationContext ctx = SpringApplication.run(BApplication.class, args);
System.in.read();
ctx.close();
}
@Autowired
TransactionServiceImpl tsi;
@Override
public void run(String... args) throws Exception
{
Users user = new Users();
user.setFirstName("Moe");
user.setGender("M");
user.setLastName("Moe");
tsi.save(user);
}
}
以下是jta配置:
jta配置
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.springframework.context.annotation.Bean;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.DependsOn;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.postgresql.xa.PGXADataSource;
@Configuration("jtaConfig")
public class JtaConfig
{
private static final Logger appLogger = LoggerFactory.getLogger(JtaConfig.class);
@Value("${amq.broker.url}")
private String brokerUrl;
@Value("${amq.broker.username}")
private String brokerUsername;
@Value("${amq.broker.password}")
private String brokerPassword;
@Value("${postgresql.datasource.url}")
String dataSourceUrl;
@Value("${postgresql.datasource.username}")
String dsUsername;
@Value("${postgresql.datasource.password}")
String dsPassword;
@Value("${postgresql.datasource.driver.classname}")
String dsClassName;
@Value("${postgresql.initial.connections}")
int initialDSConnections;
@Value("${postgresql.max.connections}")
int maxDSConnections;
@Bean(initMethod = "init", destroyMethod = "shutdownForce")
public UserTransactionService userTransactionService()
{
Properties atProps = new Properties();
atProps.put("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory");
return new UserTransactionServiceImp(atProps);
}
@Bean (initMethod = "init", destroyMethod = "close")
@DependsOn("userTransactionService")
public UserTransactionManager atomikosTransactionManager()
{
UserTransactionManager utm = new UserTransactionManager();
utm.setStartupTransactionService(false);
utm.setForceShutdown(true);
return utm;
}
@Bean
@DependsOn("userTransactionService")
public UserTransaction userTransaction()
{
UserTransactionImp ut = new UserTransactionImp();
try
{
ut.setTransactionTimeout(1000);
}
catch (SystemException _e)
{
appLogger.error("Configuration exception.", _e);
return null;
}
return ut;
}
@Bean
public Properties hibernateProperties()
{
Properties hibernateProp = new Properties();
hibernateProp.put("hibernate.dialect", "org.hibernate.dialect.PostgreSQLDialect");
hibernateProp.put("hibernate.hbm2ddl.auto", "create-drop");
hibernateProp.put("hibernate.show_sql", true);
hibernateProp.put("hibernate.max_fetch_depth", 3);
hibernateProp.put("hibernate.jdbc.batch_size", 10);
hibernateProp.put("hibernate.jdbc.fetch_size", 50);
return hibernateProp;
}
@Bean
public JpaVendorAdapter jpaVendorAdapter()
{
return new HibernateJpaVendorAdapter();
}
@Primary
@Bean(name = "pgDataSource1", initMethod = "init", destroyMethod = "close")
public DataSource pgDataSource1()
{
PGXADataSource primaryXaDataSource = new PGXADataSource();
primaryXaDataSource.setUrl(dataSourceUrl);
primaryXaDataSource.setUser(dsUsername);
primaryXaDataSource.setPassword(dsPassword);
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(primaryXaDataSource);
xaDataSource.setUniqueResourceName("primaryXaDs1");
xaDataSource.setMinPoolSize(initialDSConnections);
xaDataSource.setMaxPoolSize(maxDSConnections);
return xaDataSource;
}
@Primary
@Bean(name = "jmsConnectionFactory", initMethod = "init", destroyMethod = "close")
public ConnectionFactory connectionFactory()
{
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
ActiveMQConnectionFactory activeMqXaConnectionFactory = new ActiveMQConnectionFactory();
try
{
activeMqXaConnectionFactory.setBrokerURL(brokerUrl);
activeMqXaConnectionFactory.setUser(brokerUsername);
activeMqXaConnectionFactory.setPassword(brokerPassword);
atomikosConnectionFactoryBean.setUniqueResourceName("jmsXAConnectionFactory");
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMqXaConnectionFactory);
}
catch (JMSException _e)
{
appLogger.info("JMS Configuration Error: " + _e);
_e.printStackTrace();
}
return atomikosConnectionFactoryBean;
}
@PostConstruct
public void postConstructDetails()
{
appLogger.info("Post Construct Start: JtaConfig.");
appLogger.info(" - JMS: Artemis URL: {}", brokerUrl);
appLogger.info(" - Artemis Username: {}", brokerUsername);
appLogger.info(" - Artemis Password: {}", brokerPassword);
appLogger.info(" - DS: PostgreSQL URL: {}", dataSourceUrl);
appLogger.info(" - DS: PostgreSQL Username: {}", dsUsername);
appLogger.info(" - DS: PostgreSQL Password: {}", dsPassword);
appLogger.info(" - DS: PostgreSQL Min Conn: {}", initialDSConnections);
appLogger.info(" - DS: PostgreSQL Max Conn: {}", maxDSConnections);
appLogger.info("Post Construct End: JtaConfig.");
appLogger.info(" ");
}
}
以下是服务配置的实现:
服务配置:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
import javax.persistence.EntityManagerFactory;
@Configuration
@EnableTransactionManagement
@ComponentScan(basePackages = "xxx.xxx.service")
public class ServicesConfig
{
private Logger appLogger = LoggerFactory.getLogger(ServicesConfig.class);
@Autowired
JtaConfig jtaConfig;
@Bean(name = "xaJmsTemplate")
public JmsTemplate jmsTemplate()
{
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jtaConfig.connectionFactory());
jmsTemplate.setPubSubDomain(false);
return jmsTemplate;
}
@Bean(name = "entityManangerFactory")
public EntityManagerFactory entityManagerFactory()
{
LocalContainerEntityManagerFactoryBean factoryBean = new LocalContainerEntityManagerFactoryBean();
factoryBean.setPackagesToScan("xxx.xxx.model");
factoryBean.setDataSource(jtaConfig.pgDataSource1());
factoryBean.setJpaProperties(jtaConfig.hibernateProperties());
factoryBean.setPersistenceUnitName("entityManagerFactoryA");
factoryBean.setJpaVendorAdapter(jtaConfig.jpaVendorAdapter());
factoryBean.afterPropertiesSet();
return factoryBean.getNativeEntityManagerFactory();
}
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager()
{
JtaTransactionManager ptm = new JtaTransactionManager();
ptm.setTransactionManager(jtaConfig.atomikosTransactionManager());
ptm.setUserTransaction(jtaConfig.userTransaction());
return ptm;
}
@PostConstruct
public void postConstructDetails()
{
appLogger.info("Post Construct Start: ServicesConfig.");
appLogger.info(" - JMS: Artemis URL: {}", jtaConfig);
appLogger.info(" - JMS Template: {}", jmsTemplate());
appLogger.info("Post Construct End: ServicesConfig.");
appLogger.info(" ");
}
}
以下是服务实现:
事务服务模板
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import xxx.xxx.Users;
@Service("transactionService")
@Transactional
public class TransactionServiceImpl implements TransactionServiceIntf
{
private static final Logger appLogger = LoggerFactory.getLogger(TransactionServiceImpl.class);
@Autowired
@Qualifier("xaJmsTemplate")
JmsTemplate jmsTemplate;
@Override
public Users save(Users _user)
{
appLogger.info("TransactionServiceImpl: save: Entered.");
Users user = _user;
try
{
if(user == null)
{
appLogger.info("User: Null.");
}
else
{
if(jmsTemplate == null)
{
appLogger.info("JMS Template: Null.");
}
else
{
appLogger.info("JMS Template: Saving.");
jmsTemplate.convertAndSend("crequests", user);
}
}
// The rollback should happen with the exception.
throw new Exception();
}
catch(Exception _e)
{
appLogger.error("Catching exception: " + _e);
}
appLogger.info("TransactionServiceImpl: save: Exiting.");
return user;
}
}
暂无答案!
目前还没有任何答案,快来回答吧!