问题
我已经定义了一个CustomHttpMessageReader
(它实现了HttpMessageReader<CustomClass>
),它能够从服务器读取多部分响应,并将接收到的部分转换为特定类的对象。CustomHttpMessageReader
在内部使用DefaultPartHttpMessageReader
来实际读取/解析多部分响应。CustomHttpMessageReader
累积DefaultReader读取的部分,并将它们转换为所需的类CustomClass
。
我已经创建了一个CustomHttpMessageConverter
,它对RestTemplate
做了同样的事情,但我很难对WebClient
做同样的事情。
我总是得到以下异常:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
at reactor.core.publisher.Flux.blockFirst(Flux.java:2600)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMultipartData(CustomHttpMessageReader.java:116)
at com.company.project.deserializer.multipart.CustomHttpMessageReader.readMono(CustomHttpMessageReader.java:101)
at org.springframework.web.reactive.function.BodyExtractors.lambda$readToMono$14(BodyExtractors.java:211)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
...
请注意,我对异步运行WebClient
并不感兴趣,我只是为了将来验证我的应用程序,因为RestTemplate
显然是only in maintenance mode,Pivotal/Spring的人建议使用WebClient
。
我的尝试
据我所知,有些线程是不允许被阻塞的,也就是异常中的netty-nio线程。我试着从我的依赖项中删除netty,这样我就可以只依赖Tomcat了。然而,这似乎没有帮助,因为我得到了另一个异常,解释了我,没有合适的HttpConnector
存在(WebClient.Builder抛出的异常)
No suitable default ClientHttpConnector found
java.lang.IllegalStateException: No suitable default ClientHttpConnector found
at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.initConnector(DefaultWebClientBuilder.java:297)
at org.springframework.web.reactive.function.client.DefaultWebClientBuilder.build(DefaultWebClientBuilder.java:266)
at com.company.project.RestClientUsingWebClient.getWebclient(RestClientUsingWebClient.java:160)
我也尝试过在单元测试中执行我的代码,就像启动整个Spring上下文一样。不幸的是结果是一样的。
设置
为了提供更多的细节,下面是前面提到的类的片段。为了更好地理解正在发生的事情,这些类没有完全显示。所有必要的方法都被实现了(例如,阅读器中的canRead()
)。
CustomHttpMessageReader
的第一个字符
我还在类中包括了CustomPart
的用法(除了CustomClass
),只是为了说明Part
的内容也是读的,即被阻止的。
public class CustomHttpMessageReader implements HttpMessageReader<CustomClass> {
private final DefaultPartHttpMessageReader defaultPartHttpMessageReader = new DefaultPartHttpMessageReader();
@Override
public Flux<CustomClass> read(final ResolvableType elementType, final ReactiveHttpInputMessage message,
final Map<String, Object> hints) {
return Flux.merge(readMono(elementType, message, hints));
}
@Override
public Mono<CustomClass> readMono(final ResolvableType elementType, final ReactiveHttpInputMessage message,
final Map<String, Object> hints) {
final List<CustomPart> customParts = readMultipartData(message);
return convertToCustomClass(customParts);
}
private List<CustomPart> readMultipartData(final ReactiveHttpInputMessage message) {
final ResolvableType resolvableType = ResolvableType.forClass(byte[].class);
return Optional.ofNullable(
defaultPartHttpMessageReader.read(resolvableType, message, Map.of())
.buffer()
.blockFirst()) // <- EXCEPTION IS THROWN HERE!
.orElse(new ArrayList<>())
.stream()
.map(part -> {
final byte[] content = Optional.ofNullable(part.content().blockFirst()) //<- HERE IS ANOTHER BLOCK
.map(DataBuffer::asByteBuffer)
.map(ByteBuffer::array)
.orElse(new byte[]{});
// Here we cherry pick some header fields
return new CustomPart(content, someHeaderFields);
}).collect(Collectors.toList());
}
}
WebClient
的用法
class RestClientUsingWebClient {
/**
* The "Main" Method for our purposes
*/
public Optional<CustomClass> getResource(final String baseUrl, final String id) {
final WebClient webclient = getWebclient(baseUrl);
//curl -X GET "http://BASE_URL/id" -H "accept: multipart/form-data"
return webclient.get()
.uri(uriBuilder -> uriBuilder.path(id).build()).retrieve()
.toEntity(CustomClass.class)
.onErrorResume(NotFound.class, e -> Mono.empty())
.blockOptional() // <- HERE IS ANOTHER BLOCK
.map(ResponseEntity::getBody);
}
//This exists also as a Bean definition
private WebClient getWebclient(final String baseUrl) {
final ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(codecs -> {
codecs.defaultCodecs().maxInMemorySize(16 * 1024 * 1024);
codecs.customCodecs().register(new CustomHttpMessageReader()); // <- Our custom reader
})
.build();
return WebClient.builder()
.baseUrl(baseUrl)
.exchangeStrategies(exchangeStrategies)
.build();
}
}
build.gradle
的用法
为了完整起见,下面是我认为是我的build.gradle
的相关部分
plugins {
id 'org.springframework.boot' version '2.7.2'
id 'io.spring.dependency-management' version '1.0.13.RELEASE'
...
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-web' // <- This
implementation 'org.springframework.boot:spring-boot-starter-webflux'
// What I tried:
// implementation ('org.springframework.boot:spring-boot-starter-webflux'){
// exclude group: 'org.springframework.boot', module: 'spring-boot-starter-reactor-netty'
//}
...
}
1条答案
按热度按时间2g32fytz1#
如果我们查看您提供的堆栈跟踪,我们会看到以下3行
它们应该从下往上读。那么它们告诉我们什么呢?
最下面一行告诉我们,
CustomHttpMessageReader.java
类中101
行上的函数readMono
首先被调用。然后,该函数调用
CustomHttpMessageReader
类(与上面的类相同)中116
行上的函数readMultipartData
然后在
Flux
类的2600
行调用函数blockFirst
。这是你的阻塞电话。
因此,我们可以知道函数
readMultipartData
中存在阻塞调用。那么为什么我们不能在函数中阻止呢?如果我们在API中查找函数覆盖HttpMessageReader的接口,我们可以看到函数返回
Mono<T>
,这意味着函数是async
函数。如果它是
async
,而我们是block
,我们可能会得到非常非常差的性能。此接口在
Spring WebClient
中使用,Spring WebClient
是一个完整的async
客户端。您可以在非异步应用程序中使用它,但这意味着您可以在
WebClient
外部阻塞,但在内部,如果您希望它尽可能高效,它需要完全异步运行。因此,最重要的一点是,您不应该阻塞任何返回
Mono
或Flux
的函数。