Spring Webflux中RetryBackoffSpec中的动态回退持续时间

zxlwwiss  于 2023-03-18  发布在  Spring
关注(0)|答案(1)|浏览(194)
Mono.fromCallable(() -> sendRequestToServer(params))
        .doOnSuccess(result -> handleResponse(result, grantMapEntry, deviceInfo))
        .retryWhen(RetryBackoffSpec
            .backoff(5, ofSeconds(10))
            .doAfterRetry(retryInfo -> {
                  log.warn("retrying");
             })
            .scheduler(hbRetryScheduler)
            .jitter(0.5);)
        .subscribe();

上面的规范工作正常,但问题是有时服务器无法处理请求,并返回106响应代码,retryTime以秒为单位(秒后,请求应重试与回退和抖动).我如何使用RetryBackoffSpec中返回的retryTime?

wkyowqbh

wkyowqbh1#

您可以根据RetrySignal中的条件(例如响应中的http头)创建自定义RetrySpec
以下示例使用Retry-After标头定义自定义回退逻辑。

private static class DynamicRetrySpec extends Retry {
    private final int maxRetries;
    private final Duration defaultBackoff;

    public DynamicRetrySpec(int maxRetries, Duration defaultBackoff) {
        this.maxRetries = maxRetries;
        this.defaultBackoff = defaultBackoff;
    }

    @Override
    public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
        return retrySignals.flatMap(this::getRetry);
    }

    private Mono<Long> getRetry(Retry.RetrySignal rs) {
        if (rs.failure() instanceof WebClientResponseException.TooManyRequests) {
            if (rs.totalRetries() < maxRetries) {
                Duration delay = getBackOffDelayFromHeaders((WebClientResponseException.TooManyRequests) rs.failure());

                log.info("retry {} with backoff {}sec", rs.totalRetries(), delay.toSeconds());
                return Mono.delay(delay)
                        .thenReturn(rs.totalRetries());
            } else {
                log.info("retries exhausted with error: {}", rs.failure().getMessage());
                throw Exceptions.propagate(rs.failure());
            }
        } else {
            throw Exceptions.propagate(rs.failure());
        }
    }

    private Duration getBackOffDelayFromHeaders(WebClientResponseException.TooManyRequests exceptionResponse) {
        String retryAfter = exceptionResponse.getHeaders().getFirst("Retry-After");
        if (retryAfter != null) {
            return Duration.ofSeconds(Integer.parseInt(retryAfter));
        }
        return defaultBackoff;
    }
}

然后将其用于WebClient

webClient.get()
    .uri("/api")
    .retrieve()
    .toBodilessEntity()
    .retryWhen(new DynamicRetrySpec(3, Duration.ofSeconds(1)));

相关问题