Spring Boot 使用具有自定义HttpMessageReader的Webclient同步读取响应

fkaflof6  于 2022-12-12  发布在  Spring
关注(0)|答案(1)|浏览(513)

问题

我已经定义了一个CustomHttpMessageReader(它实现了HttpMessageReader<CustomClass>),它能够从服务器读取多部分响应,并将接收到的部分转换为特定类的对象。CustomHttpMessageReader在内部使用DefaultPartHttpMessageReader来实际读取/解析多部分响应。
CustomHttpMessageReader累积DefaultReader读取的部分,并将它们转换为所需的类CustomClass
我已经创建了一个CustomHttpMessageConverter,它对RestTemplate做了同样的事情,但我很难对WebClient做同样的事情。
我总是得到以下异常:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
    at reactor.core.publisher.Flux.blockFirst(Flux.java:2600)
    at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMultipartData(CustomHttpMessageReader.java:116)
    at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMono(CustomHttpMessageReader.java:101)
    at org.springframework.web.reactive.function.BodyExtractors.lambda$readToMono$14(BodyExtractors.java:211)
    at java.base/java.util.Optional.orElseGet(Optional.java:369)
    ...

请注意,我对异步运行WebClient并不感兴趣,我只是为了将来验证我的应用程序,因为RestTemplate显然是only in maintenance mode,Pivotal/Spring的人建议使用WebClient

我的尝试

据我所知,有些线程是不允许被阻塞的,也就是异常中的netty-nio线程。我试着从我的依赖项中删除netty,这样我就可以只依赖Tomcat了。然而,这似乎没有帮助,因为我得到了另一个异常,解释了我,没有合适的HttpConnector存在(WebClient.Builder抛出的异常)

No suitable default ClientHttpConnector found
java.lang.IllegalStateException: No suitable default ClientHttpConnector found
    at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.initConnector(DefaultWebClientBuilder.java:297)
    at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:266)
    at com.company.project.RestClientUsingWebClient.getWebclient(RestClientUsingWebClient.java:160)

我也尝试过在单元测试中执行我的代码,就像启动整个Spring上下文一样。不幸的是结果是一样的。

设置

为了提供更多的细节,下面是前面提到的类的片段。为了更好地理解正在发生的事情,这些类没有完全显示。所有必要的方法都被实现了(例如,阅读器中的canRead())。

CustomHttpMessageReader的第一个字符

我还在类中包括了CustomPart的用法(除了CustomClass),只是为了说明Part的内容也是读的,即被阻止的。

public class CustomHttpMessageReader  implements HttpMessageReader<CustomClass> {

    private final DefaultPartHttpMessageReader defaultPartHttpMessageReader = new DefaultPartHttpMessageReader();

    @Override
    public Flux<CustomClass> read(final ResolvableType elementType, final ReactiveHttpInputMessage message,
                                  final Map<String, Object> hints) {
        return Flux.merge(readMono(elementType, message, hints));
    }

    @Override
    public Mono<CustomClass> readMono(final ResolvableType elementType, final ReactiveHttpInputMessage message,
                                      final Map<String, Object> hints) {
        final List<CustomPart> customParts = readMultipartData(message);
        return convertToCustomClass(customParts);
    }

    private List<CustomPart> readMultipartData(final ReactiveHttpInputMessage message) {
        final ResolvableType resolvableType = ResolvableType.forClass(byte[].class);
        return Optional.ofNullable(
                        defaultPartHttpMessageReader.read(resolvableType, message, Map.of())
                                .buffer()
                                .blockFirst()) // <- EXCEPTION IS THROWN HERE!
                .orElse(new ArrayList<>())
                .stream()
                .map(part -> {
                    final byte[] content = Optional.ofNullable(part.content().blockFirst()) //<- HERE IS ANOTHER BLOCK
                            .map(DataBuffer::asByteBuffer)
                            .map(ByteBuffer::array)
                            .orElse(new byte[]{});

                    // Here we cherry pick some header fields
                    return new CustomPart(content, someHeaderFields);
                }).collect(Collectors.toList());
    }
}

WebClient的用法

class RestClientUsingWebClient  {

    /**
     * The "Main" Method for our purposes
     */
    public Optional<CustomClass> getResource(final String baseUrl, final String id) {
        final WebClient webclient = getWebclient(baseUrl);

        //curl -X GET "http://BASE_URL/id" -H  "accept: multipart/form-data"
        return webclient.get()
                .uri(uriBuilder -> uriBuilder.path(id).build()).retrieve()
                .toEntity(CustomClass.class)
                .onErrorResume(NotFound.class, e -> Mono.empty())
                .blockOptional() // <- HERE IS ANOTHER BLOCK
                .map(ResponseEntity::getBody);

    }

    //This exists also as a Bean definition
    private WebClient getWebclient(final String baseUrl) {
         final ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() 
                 .codecs(codecs -> {
                     codecs.defaultCodecs().maxInMemorySize(16 * 1024 * 1024);
                     codecs.customCodecs().register(new CustomHttpMessageReader()); // <- Our custom reader
                 }) 
                 .build();

         return WebClient.builder()
                 .baseUrl(baseUrl)
                 .exchangeStrategies(exchangeStrategies)
                 .build();
    }
}

build.gradle的用法

为了完整起见,下面是我认为是我的build.gradle的相关部分

plugins {
    id 'org.springframework.boot' version '2.7.2'
    id 'io.spring.dependency-management' version '1.0.13.RELEASE'
    ...
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'  // <- This 
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    // What I tried:
    // implementation ('org.springframework.boot:spring-boot-starter-webflux'){
    //    exclude group: 'org.springframework.boot', module: 'spring-boot-starter-reactor-netty'
    //}
...
}
2g32fytz

2g32fytz1#

如果我们查看您提供的堆栈跟踪,我们会看到以下3行

at reactor.core.publisher.Flux.blockFirst(Flux.java:2600)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMultipartData(CustomHttpMessageReader.java:116)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMono(CustomHttpMessageReader.java:101)

它们应该从下往上读。那么它们告诉我们什么呢?
最下面一行告诉我们,CustomHttpMessageReader.java类中101行上的函数readMono首先被调用。
然后,该函数调用CustomHttpMessageReader类(与上面的类相同)中116行上的函数readMultipartData
然后在Flux类的2600行调用函数blockFirst
这是你的阻塞电话。
因此,我们可以知道函数readMultipartData中存在阻塞调用。
那么为什么我们不能在函数中阻止呢?如果我们在API中查找函数覆盖HttpMessageReader的接口,我们可以看到函数返回Mono<T>,这意味着函数是async函数。
如果它是async,而我们是block,我们可能会得到非常非常差的性能。
此接口在Spring WebClient中使用,Spring WebClient是一个完整的async客户端。
您可以在非异步应用程序中使用它,但这意味着您可以在WebClient外部阻塞,但在内部,如果您希望它尽可能高效,它需要完全异步运行。
因此,最重要的一点是,您不应该阻塞任何返回MonoFlux的函数。

相关问题