java 使用'block()`的WebClient请求在整个请求中是否只使用1个线程?即使运行了ExchangeFilterFunction?

11dmarpk  于 2023-02-11  发布在  Java
关注(0)|答案(1)|浏览(311)

我正在运行一个Sping Boot 3.0.1应用程序,使用WebClient来管理HTTP请求(从RestTemplate切换)。我正在以阻塞的方式使用WebClient-发送我的请求如下:
webClient.get().uri(path).headers(headers).retrieve().bodyToMono(responseType).block();
这里已经定义了pathheadersresponseType,下面是我如何声明WebClient Spring Bean的:

@Bean
public WebClient webClient(CloseableHttpAsyncClient httpAsyncClient) {
    return WebClient.builder()
            .clientConnector(new HttpComponentsClientHttpConnector(httpAsyncClient))
            .filter(new CustomResponseHeaderFilter())
            .build();
}

下面是CustomResponseHeaderFilter类的实现(是的,我知道它现在什么也没做):

public class CustomResponseHeaderFilter implements ExchangeFilterFunction {
    @Override
    public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
        return next.exchange(request);
    }

    @Override
    public ExchangeFilterFunction andThen(ExchangeFilterFunction afterFilter) {
        return ExchangeFilterFunction.super.andThen(afterFilter);
    }

    @Override
    public ExchangeFunction apply(ExchangeFunction exchange) {
        return ExchangeFilterFunction.super.apply(exchange);
    }
}

那么,在我的应用程序的当前状态下,整个请求中运行的代码是在同一个线程上运行的吗?我在另一篇文章here中看到一些信息,说线程暂停了,在后台有一个单独的线程用于网络操作。
更具体地说,block()方法调用是否会影响ExchangeFilterFunction的执行方式?我的意思是,既然我们使用block().filter()方法中的代码是否也与请求运行在同一线程上?
到目前为止,我还没有看到任何关于阻塞和过滤函数的文档,我也不完全确定这在幕后是如何工作的。

**更新:**这与使用MDC直接相关。本质上,我想知道MDC是否可以像整个请求在一个线程上运行一样使用-就像在RestTemplate中使用它一样。我想知道如果我在使用.block()时向logResponse过滤器函数中的MDC上下文添加一个值,它是否可以在请求处理后使用?

谢谢

dfty9e19

dfty9e191#

通常,在reactor中,操作符继续在Thread上工作,在Thread上执行了前一个操作符。此外,WebClient只是下划线http客户端(在您的情况下是Netty或Apache HttpClient)之上的 Package 器。http客户端将维护一个单独的线程池来处理http请求。
这意味着WebClient请求将在调用者线程上执行,但对于网络操作,执行将切换到httpclient-dispatch-#线程,然后继续在该线程上执行,直到阻塞为止。
因此,请求筛选器将在调用者线程上执行,响应筛选器将在http客户端线程httpclient-dispatch-#上执行。
下面是一个示例,可以说明这一点

@Test
void test() {
    CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom().build();
    WebClient webClient = WebClient.builder()
            .baseUrl("https://google.com")
            .clientConnector(new HttpComponentsClientHttpConnector(httpAsyncClient))
            .filter(logRequest())
            .filter(logResponse())
            .build();

    var request = webClient.get()
            .uri("/")
            .retrieve()
            .bodyToMono(String.class)
            .block();}

private ExchangeFilterFunction logRequest() {
    return (clientRequest, next) -> {
        log.info("Request: {} {}", clientRequest.method(), clientRequest.url());
        return next.exchange(clientRequest);
    };
}

private ExchangeFilterFunction logResponse() {
    return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
        log.info("Response Status: {}", clientResponse.statusCode());
        return Mono.just(clientResponse);
    });
}
21:53:54.051 [Test worker] INFO WebClientTest - START
21:53:54.728 [Test worker] INFO WebClientTest - Request: GET https://google.com/
21:53:55.445 [httpclient-dispatch-2] INFO WebClientTest - Response Status: 200 OK
21:53:55.572 [Test worker] INFO WebClientTest - END

相关问题