group在springboot的一个事务中发送kafka消息和db更新

hc2pp10m  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(364)

我需要在一个事务中执行多个操作
产生Kafka信息
更新表a
更新表b
我可以发送消息,并且不更新两个表(a和b)。我无法生成消息并更新其中一个表。
我正在努力用电脑来实现我的目标 @Transactional 注解

import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

 @Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
    public void handle(Event approvalEvent) {
        var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

        entityService.approve(entity.getTransactionId());
        logService.logApproval(entity);
        producer.send(approvalEvent);
    }

我做得对吗?

j0pj023g

j0pj023g1#

上述方法的问题在于,您在一个事务中与两个不同的系统(数据库和消息队列)交互。当一个系统上的操作成功而另一个系统上的操作失败时,要处理的场景组合使解决方案变得复杂。
在微服务世界中,有一种模式可以处理完全相同的场景。它被称为发件箱模式。
你可以在这里了解更多。
简短的摘要是,您的数据库中还有一个名为outbox的表,其中包含要发布到消息队列的消息。
在添加\更新实体的db事务中,在发件箱表工具中插入一行,其中包含实体操作的详细信息。
然后从发件箱表异步读取行,并通过poling或使用变更数据捕获发布到消息队列。在这里可以看到一个使用debezium的示例实现。
您的事务代码如下所示。

@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.SERIALIZABLE)
public void handle(Event approvalEvent) {
    var entity = entityService.getLatestVersion(approvalEvent.getTransactionId());

    entityService.approve(entity.getTransactionId());
    logService.logApproval(entity);
    //Outbox is the table containing the records to be published to MQ 
    outboxRepo.save(approvalEvent);
}

相关问题