我正在开发一个带有webflux、springdatar2dbc和postgresql的springboot2.4webapp。事实上,我没有成功地处理一个文件中的每一项 Flux
在专用事务中。在我看来,用例非常常见:
从远程api检索项,
配置返回的 Flux
示例,以便在专用事务中处理每个项目。
如果一个项目的事务失败,它不能回滚其他项目的事务。对于提交也是如此。
实际上,我使用了下面的代码,但事情并不像我预期的那样工作,可能是因为我误解了如何使用React上下文:
public ParallelFlux<Item> processAllItems() {
LOGGER.debug("Starting item processing");
return client.findItems()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
.flatMap(this::processItem);
}
@Transactional
public Mono<Item> processItem(Item item) {
// Process the item asynchronously, potentially updating a PostgreSQL
}
当通量发射出2个项目时,实际上,我可以在日志中看到 R2dbcTransactionManager
在不同的线程中为每个项创建一个事务。但在此之后,对其中一个事务的任何操作(提交/回滚)也会对另一个事务执行,两者都在同一个线程中。例如,在下面的日志中,一个项目处理失败,并回滚两个事务:
DEBUG [ scheduling-1] b.w.d.u.c.m.ItemProcessor : Starting item processing
DEBUG [ boundedElastic-3] b.w.d.u.c.m.ItemProcessor : Item: 35ZRNT9RVOQK
DEBUG [ boundedElastic-2] b.w.d.u.c.m.ItemProcessor : Item: 3XDSWAMB38KB
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [ boundedElastic-2] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]] to manual commit
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] to manual commit
ERROR [ reactor-tcp-nio-1] b.w.d.u.c.m.ItemProcessor : Unknown item: 3XDSWAMB38KB
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]]
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Releasing R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] after transaction
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]]
我高度怀疑在单独的线程上处理项目不足以为每个线程启动事务上下文。
让通量中的每个项在自己的事务中处理,并让一些项事务提交,而另一些事务回滚,正确的方法是什么?
提前感谢您的帮助!当做
编辑
我想我找到了这个实现不起作用的原因。
实际上,当一个项目处理失败时,事务会按预期回滚,错误信号会传播到原始流量,而原始流量又会失败。当然,这不是期望的行为,必须捕获错误信号,以确保通量中的其他项目得到独立处理。无论每个项目发生什么,通量都必须成功完成。
下面的代码允许这样做:
public ParallelFlux<Item> processAllItems() {
LOGGER.debug("Starting item processing");
return client.findItems()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
.flatMap(item -> processItem(item)
.onErrorResume(t -> Mono.<Item>empty().doOnSuccess(data -> LOGGER.error("Item skipped after error", t)
);
}
唯一我还不清楚的是,在每个发布者及其React上下文中如何处理每个项目的事务上下文:当错误没有被捕获时 onErrorResume
,错误信号回滚其他事务。在springdatar2dbc中,我没有找到实现注解,特别是关于 ReactiveTransactionManager
以及 R2dbcTransactionManager
实现。
请随时提供有关这一点的细节,特别是!提前谢谢!当做
暂无答案!
目前还没有任何答案,快来回答吧!