springkafka和springcloudstream允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
if(shouldFail.get()) {
shouldFail.set(false);
throw new RuntimeException("Simulated network error");
} else {
//We fail every other request as a test
shouldFail.set(true);
}
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
在这个节选中,有一个读Kafka主题,一个写在数据库中,另一个写到另一个Kafka主题,所有这些都是事务性的。
我想知道的是,而且我想回答的是,这在技术上是如何实现和实施的。
既然datasource和kafka不参与xa事务(两阶段提交),那么实现如何保证本地事务能够以事务方式从kafka读取、提交到数据库并写入kafka?
1条答案
按热度按时间mkshixfv1#
没有保证,只有Kafka本身。
spring提供了事务同步,因此提交是紧密的,但是db有可能提交,而kafka没有。所以你必须处理复制品的可能性。
当直接使用SpringKafka时,正确的方法不是使用
@Transactional
但是使用ChainedKafkaTransactionManager
在侦听器容器中。请参阅事务同步。
另请参阅spring中的分布式事务,包括xa和后台的“best efforces 1pc模式”。
但是,对于stream,不支持链式事务管理器,因此
@Transactional
是必需的(使用db事务管理器)。这将为链式tx管理器提供类似的结果,db首先提交,就在kafka之前。