使用带有事务的React式(r2dbc)批处理

u59ebvdq  于 2021-07-08  发布在  Java
关注(0)|答案(2)|浏览(580)

嗨,我有个很重要的问题。我尝试使用reactive r2dbc创建一个批处理,并使用transactional对方法进行注解。但如果我同时使用事务性代码和批处理代码,代码就会挂起,无法工作。下面是代码

@Transactional /***Transaction***/
    @GetMapping("/batchFetchData")
    public Flux<Object> batchFetch() {
        long startTime = System.currentTimeMillis();
        Mono.from(databaseConfiguration.connectionFactory().create())
                .flatMapMany(connection -> Flux.from(connection
                        .createBatch() /***Creating batch***/
                        .add("SELECT * FROM xtable where xId = 232323")
                        .add("SELECT * FROM ytable where yId = 454545")
                        .add("SELECT * FROM ztable where zId = 676767")
                        //.execute()));  /***Execution batch***/
                        .execute())).as(StepVerifier::create)
                .expectNextCount(3) /***Expect count batch***/
                .verifyComplete();  /***Verify batch***/

        LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
    return null;
    }
weylhg0b

weylhg0b1#

你的问题是:

return null;

在React式应用程序中,应该返回mono/flux,即使流中没有任何项,也应该返回 Mono.emtpy 相反。
检查我的示例插入多个记录。
并对试验结果进行逐步验证。
对于webflux应用程序中的事务支持,您必须阅读相关文档,以检查它是否支持一般本地事务或使用它时的限制。
如果事务得到很好的支持,有两种方法可以使用它。
注入 TransactionalOperator (与传统的 TransactionTemplate ) Package 您的业务逻辑。
应用 @Transaction 类或方法的注解。

jei2mxaa

jei2mxaa2#

你正在打破React链。
在React式编程中,在订阅之前什么都不会发生。
那是什么意思,我可以用一个小例子来说明。

// If running this, nothing happens
Mono.just("Foobar");

而:

Mono.just("Foobar").subscribe(s -> System.out.println(s));

将打印:

Foobar

如果你有一个函数,这也适用

public void getString() {
    Mono.just("Foobar");
}

// Nothing happens, you have declared something 
// but it will never get run, no one is subscribing
getString();

您需要做的是:

public Mono<String> getString() {
    // This could be saving to a database or anything, this will now get run
    return Mono.just("Now this code will get run");
}

// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));

发生了什么?在React式编程中,一旦有人订阅mono或flux,程序就会遍历并构建回调链,直到找到开始产生值的生产者(在我的例子中是just语句)。这个阶段称为“装配阶段”。当这个阶段完成后,React链将开始为订阅的任何人生成值。
如果没有人订阅,就不会建立链。
那么谁是订户呢?它通常是价值的最终消费者。因此,例如,发起呼叫的网页或移动应用程序,但如果是发起呼叫的网页(例如cron作业),也可以是您的spring引导服务。
让我们看看你的代码:

@Transactional /***Transaction***/
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
    long startTime = System.currentTimeMillis();

    // Here you declare a Mono but ignoring the return type so breaking the reactive chain
    Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch() /***Creating batch***/
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    //.execute()));  /***Execution batch***/
                    .execute())).as(StepVerifier::create)
            .expectNextCount(3) /***Expect count batch***/
            .verifyComplete();  /***Verify batch***/
            // Here at the end you have no subscriber

    LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));

    // Null is not allowed in reactive chains
    return null;
}

那你怎么解决呢?
你不需要破坏React链。这是基本的React式编程。

@Transactional
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
    long startTime = System.currentTimeMillis();

    // we return here so that the calling client 
    // can subscribe and start the chain
    return Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch()
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    .execute()))
                    .then(); 
                    // then() statement throws away whatever the return 
                    // value is and just signals to the calling client 
                    // when everything is done.       
}

“我不想退货”
这就是 Mono#then 声明用于。当链中的每个部分都完成时,它会发出完成的信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,然后传递值等等 then 声明它只会发出完成的信号,而不返回任何内容(或者实际上它正在返回一个 Mono<Void> 因为在React链中不允许null)。你必须始终返回,这样每一步都可以传递它的完整信号。
我还删除了您在代码中使用的stepverifier,因为它通常用于验证单元测试中的步骤,而不用于生产代码。你可以在这里了解更多。
如果你想学习React式编程,我建议你这样做,因为它是惊人的,我喜欢它,我强烈建议你阅读优秀的React堆文档介绍React式编程,他们将解释什么都不会发生的概念,直到你订阅等。

相关问题