java 如何在Flux onError中获取数据

ars1skjm  于 2023-03-21  发布在  Java
关注(0)|答案(2)|浏览(229)

这个问题并不是针对数据库的,而是用数据库的例子来描述的

  • 我用一个React式驱动程序从数据库中获取一些实体
  • 使用itemMapper将它们转换为Item实体。
  • 保存在数据库中
  • 如果出现错误,则记录错误。

这适用于以下

reactiveTemplate
          .query(cql, itemMapper) // This returns an item object
          .doOnNext(this::saveRecord) //this save item Object
          .doOnError(this::processErrorRecord);  // Logs the exception

但是如果我想记录哪一个项目得到了错误,如何在doOnError方法中访问failedIte?

reactiveTemplate
      .query(cql, itemMapper) // This returns an item object
      .doOnNext(this::saveRecord) //this save item Object
      .doOnError(th -> processErrorRecord(<failed Item>,th)); //how to access the item which got error
n8ghc7c1

n8ghc7c11#

尝试不同的React链嵌套。
您可以将doOnError()直接附加到saveRecord()。然后,您将使record对象仍然在您的上下文中。

reactiveTemplate
    .query(cql, itemMapper) // This returns an item object
    .doOnNext(record -> saveRecord(record) //this save item Object
                         .doOnError(th -> {
                             log.error("error while saving record: {}",record)
                             processErrorRecord(record, th);
                          })
     );
inkz8wg9

inkz8wg92#

在不了解所有细节的情况下很难提出建议,但这里有一些选择。doOnNext是一个操作符,通常用于“副作用”逻辑,如日志记录、指标......以执行非React性代码。我真的不知道为什么saveRecord是非React性的,因为它看起来像是您正在使用reactiveTemplate查询数据。
如果saveRecord正在使用reactiveTemplate,则应返回Mono<Void>(而不是void)。

reactiveTemplate
        .query(cql, itemMapper) // This returns an item object
        .flatMap(record -> 
                saveRecord(record) //this save item Object
                    .doOnError(th -> {
                        log.error("error while saving record: {}", record);
                        processErrorRecord(record, th);
                    })
        );

不确定processErrorRecord的逻辑是什么,但如果这是被动的,请使用onErrorResume而不是doOnError
如果saveRecord不是响应式和阻塞式的,则需要在单独的Scheduler上执行它。

reactiveTemplate
        .query(cql, itemMapper) // This returns an item object
        .flatMap(record ->
                Mono.fromRunnable(() -> saveRecord(record))
                    .subscribeOn(Schedulers.boundedElastic())    
                    .doOnError(th -> {
                        log.error("error while saving record: {}",record)
                        processErrorRecord(record, th);
                    })
        );

相关问题