如何处理Spring reactor Mono或Flux中的错误?

nlejzf6q  于 2023-08-02  发布在  Spring
关注(0)|答案(6)|浏览(143)

我有下面的代码重新调谐Mono<Foo>

try {
    return userRepository.findById(id)  // step 1
        .flatMap(user -> barRepository.findByUserId( user.getId())  // step 2
        .map(bar-> Foo.builder().msg("Already exists").build())  // step 3
            .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())  // step 4
                .map(bar-> Foo.builder().msg("Created").build())   // step 5 
            ))
            .doOnError(throwable -> Mono.just(handleError(throwable)));
    } catch(Exception e) {
        
        log.error("from catch block");
        return Mono.just(handleError(e));
        
    }

字符串
如果在步骤1中出现错误(例如用户不存在指定的id),它会被doOnError或try-catch块捕获,还是这两个都不捕获?
如果在步骤2、步骤3、步骤4中发生错误,问题也是一样。
什么是正确的代码,使错误始终被doOnError捕获并消除try-catch?
我正在使用:

public interface UserRepository extends ReactiveMongoRepository<User, String>


barRepository也是如此。
handleError(throwable)简单地执行log.error(e.getMessage())并重新调谐Foo

ffvjumwh

ffvjumwh1#

我认为第一个错误是标题:“Mono or Flux”与错误处理无关。

*Mono最多只能发射一个项目(流一个元素)
*Flux可以发射更复杂的东西(即列表)

要处理错误,您可以遵循以下示例:

return webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(ModelYouAreRetrieving.class)
                .doOnError(throwable -> logger.error("Failed for some reason", throwable))
                .onErrorReturn(new ModelYouAreRetrieving(...))
                .block();

字符串

lsmd5eda

lsmd5eda2#

DoOnError只会产生副作用,假设findById会返回一个Mono.Error(),如果失败的话,类似这样的方法应该可以工作。

return userRepository.findById(id)
    .flatMap ( user -> 
        barRepository.findByUserId(user.getId())
        .map((user,bar)-> Foo.builder().msg("Already exists").build())  
        .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
        .map(bar-> Foo.builder().msg("Created").build())

    ))
    .onErrorReturn(throwable -> Mono.just(handleError(throwable)));

字符串
只有当您调用链的阻塞操作,或者在进入React链之前发生运行时错误时,try catch才会起作用。doOn操作不修改链,它们仅用于副作用。因为flatMap需要一个生产者,所以您需要从调用中返回一个Mono,在这种情况下,如果发生错误,那么它只会传播错误。在所有React链中,错误将传播,除非另有处理。

r3i60tvu

r3i60tvu3#

使用Exceptions.propagate(e),它将检查的异常 Package 成可以由onError处理的特殊运行时异常
下面的代码尝试用大写字母覆盖用户属性。现在,当它遇到凯尔时,抛出选中的异常,并从onErrorReturn返回MIKE

@Test
void Test19() {
    Flux.fromIterable(Arrays.asList(new User("jhon", "10000"),
            new User("kyle", "bot")))
        .map(x -> {
            try {
                return toUpper(x);
            } catch (TestException e) {
                throw Exceptions.propagate(e);
            }
        })
        .onErrorReturn(new User("MIKE", "BOT")).subscribe(x -> System.out.println(x));
}

protected final class TestException extends Exception {
    private static final long serialVersionUID = -831485594512095557L;
}

private User toUpper(User user) throws TestException{
    if (user.getName().equals("kyle")) {
        throw new TestException();
    }
    return new User(user.getName().toUpperCase(), user.getProfession().toUpperCase());
}

字符串
产出

User [name=JHON, profession=10000]
User [name=MIKE, profession=BOT]

ctehm74n

ctehm74n4#

@Gianluca平托的最后一行代码也不正确。代码不会被编译。onErrorReturn不适合复杂的错误处理。你应该使用onErrorResume。
请参阅:https://grokonez.com/reactive-programming/reactor/reactor-handle-error#21_By_falling_back_to_another_Flux
onErrorResume将回退到另一个Flux,并让您捕获和管理由前一个Flux引发的异常。如果查看onErrorReturn的实现,你会发现onErrorReturn实际上使用的是onErrorResume。
所以这里的代码应该是:

.onErrorResume(throwable -> Mono.just(handleError(throwable)));

字符串

dluptydi

dluptydi5#

@James Ralston的代码的最后一行是错误的。正确的代码应该是:

return userRepository.findById(id)
.flatMap ( user -> 
    barRepository.findByUserId(user.getId())
    .map((user,bar)-> Foo.builder().msg("Already exists").build())  
    .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())
    .map(bar-> Foo.builder().msg("Created").build())

))
.onErrorReturn(Mono.just(handleError(throwable)));

字符串

qni6mghb

qni6mghb6#

在创建React流时,我们需要使用onError*,因为它提供了一个回退Mono/Flux,而杜恩 * 是副作用操作符。
注:示例在Kotlin中
下面是一个例子:

fun saveItems(item: Item) = testRepository.save(item)
        .onErrorResume {
            Mono.error(
                onErrorResumeHandler(
                    it,
                    "APP-1002",
                    "Error occurred while saving the something :P, contact admin"
                )
            )
        }

fun onErrorResumeHandler(exception: Throwable, errorCode: String, errorMessage: String) =
    if (exception is TestRepositoryException) exception else
        TestServiceException(errorCode, errorMessage)

字符串
应该有一个中央异常处理程序,我们可以通过扩展AbstractErrorWebExceptionHandler来创建。顺序为-2以取代默认值。
下面是一个例子:

@Component
@Order(-2)
class BaseControllerAdvice(
    errorAttributes: ErrorAttributes,
    resources: WebProperties.Resources,
    applicationContext: ApplicationContext,
    serverCodecConfigurer: ServerCodecConfigurer
) : AbstractErrorWebExceptionHandler(errorAttributes, resources, applicationContext) {

    val log = logger()

    init {
        setMessageWriters(serverCodecConfigurer.writers)
    }

    override fun getRoutingFunction(errorAttributes: ErrorAttributes?) =
        router {
            RequestPredicates.all().invoke(this@BaseControllerAdvice::renderErrorResponse)
        }
    //RouterFunctions.route(RequestPredicates.all(),this::renderErrorResponse)

    fun renderErrorResponse(
        request: ServerRequest
    ): Mono<ServerResponse> {
        val errorPropertiesMap = getErrorAttributes(
            request,
            ErrorAttributeOptions.defaults()
        )
        val ex: ApplicationException = getError(request) as ApplicationException
        log.info("Error attributes:{}", request)
        return ServerResponse.status(HttpStatus.BAD_REQUEST)
            .contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromValue(ErrorResponseVO(ex.errorCode, ex.errorMessage)))
    }

    data class ErrorResponseVO(val errorMessage: String, val errorCode: String)

}

相关问题