springkafka/springcloudstream如何保证涉及数据库和kafka的事务性/原子性?

0dxa2lsx  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(799)

springkafka和springcloudstream允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:

@Transactional
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public PersonEvent process(PersonEvent data) {
        logger.info("Received event={}", data);
        Person person = new Person();
        person.setName(data.getName());

        if(shouldFail.get()) {
            shouldFail.set(false);
            throw new RuntimeException("Simulated network error");
        } else {
            //We fail every other request as a test
            shouldFail.set(true);
        }
        logger.info("Saving person={}", person);

        Person savedPerson = repository.save(person);

        PersonEvent event = new PersonEvent();
        event.setName(savedPerson.getName());
        event.setType("PersonSaved");
        logger.info("Sent event={}", event);
        return event;
    }

在这个节选中,有一个读Kafka主题,一个写在数据库中,另一个写到另一个Kafka主题,所有这些都是事务性的。
我想知道的是,而且我想回答的是,这在技术上是如何实现和实施的。
既然datasource和kafka不参与xa事务(两阶段提交),那么实现如何保证本地事务能够以事务方式从kafka读取、提交到数据库并写入kafka?

mkshixfv

mkshixfv1#

没有保证,只有Kafka本身。
spring提供了事务同步,因此提交是紧密的,但是db有可能提交,而kafka没有。所以你必须处理复制品的可能性。
当直接使用SpringKafka时,正确的方法不是使用 @Transactional 但是使用 ChainedKafkaTransactionManager 在侦听器容器中。
请参阅事务同步。
另请参阅spring中的分布式事务,包括xa和后台的“best efforces 1pc模式”。
但是,对于stream,不支持链式事务管理器,因此 @Transactional 是必需的(使用db事务管理器)。这将为链式tx管理器提供类似的结果,db首先提交,就在kafka之前。

相关问题