我尝试使用Webflux将生成的文件流传输到另一个位置,但是,如果文件的生成遇到错误,api会返回成功,但会使用DTO详细说明生成文件时的错误,而不是文件本身。这是使用一个非常旧且设计糟糕的api,所以请原谅使用post和api设计。
api调用(exchange())的响应是ClientResponse。在这里,我可以使用bodyToMono转换为ByteArrayResource,它可以流传输到文件中,或者,如果创建文件时出错,我也可以使用bodyToMono转换为DTO。但是,我似乎无法根据ClientResponse头的内容执行或。
在运行时,我得到一个由以下原因引起的非法状态异常
block()/blockFirst()/blockLast()是阻塞的,线程React器-http-client-epoll-12不支持这种情况
我认为我的问题是我不能在同一个函数链中调用block()两次。
我的代码片段如下所示:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
基本上,我希望根据标头中定义的MediaType以不同的方式处理ClientResponse。
这可能吗?
6条答案
按热度按时间gmol16391#
首先,有几件事可以帮助您理解解决这个用例的代码片段。
1.绝不应在返回React类型的方法内调用阻塞方法;您将阻塞应用程序的少数线程之一,这对应用程序非常不利
1.无论如何,从React器3.2 blocking within a reactive pipeline throws an error开始
1.正如注解中所建议的那样,调用
subscribe
也不是一个好主意,它或多或少地像在单独的线程中作为一个任务启动该作业,当它完成时,您将得到一个回调(subscribe
方法可以用lambdas来表示),但实际上您是在将当前管道与该任务解耦。在您有机会读取完整的响应正文并将其写入文件之前,客户端HTTP响应可能已关闭,资源可能已清理1.如果您不想在内存中缓冲整个响应,Spring提供了
DataBuffer
(想想可以池化的ByteBuffer示例)。1.如果您实现的方法本身是阻塞的(例如返回
void
),则可以调用block,例如在测试用例中。下面是一个代码片段,您可以使用它来完成此操作:
正如您所看到的,我们没有在任何地方阻塞,处理I/O的方法返回
Mono<Void>
,这是done(error)
回调的React性等价物,它在事情完成时发出信号,如果发生错误。因为我不确定
createErrorFile
方法应该做什么,所以我提供了一个createSpreadsheet
的示例,它只将主体字节写入文件。注意,因为数据缓冲区可能会被回收/池化,所以我们需要在完成后释放它们。通过这种实现,您的应用程序将在给定时间在内存中保存一些
DataBuffer
示例(出于性能原因,React式操作符将预取值),并将以React式方式写入字节。t5fffqht2#
[2021年10月19日更新]
toProcessor()
现已弃用。考虑使用
正如投票最多的答案所指出的,永远不要阻塞。在我的例子中,这是唯一的选择,因为我们在一段命令式代码中使用了一个React式库。阻塞可以通过在处理器中 Package mono来完成:
5fjcxozz3#
要在服务器请求池之外执行客户端请求,请使用
myWebClientMono.share().block();
7uhlpewt4#
尝试
myMono.subscribeOn(Schedulers.boundedElastic()).toFuture().get(5L, TimeUnit.SECONDS)
lokaqttq5#
[更新日期:2023年1月31日]
我想补充一下这个主题,并分享我的解决方案,因为
exchange()
操作符从5.3版本开始就被弃用了。详情:
由于可能泄漏内存和/或连接,自5.3起弃用;请使用exchangeToMono(函数)、exchangeToFlux(函数);还可以考虑使用retrieve(),它通过ResponseEntity提供对响应状态和报头的访问以及错误状态处理。
因此,我将使用retrieve()操作符给出此任务的一个示例,并使用 * streaming * 方法以某种方式简化将文件保存到文件系统的过程。
因为它给了我们访问头部和响应主体的机会,我们可以这样做:
将流
Publisher<DataBuffer>
保存到文件系统的方法:此外,如您所见,使用
DataBufferUtils.write()
,我们可以直接将流写入文件这里我们不使用任何阻塞API,如Input/OutputStream,因此我们不会同时在内存中缓冲文件的整个内容。
ubof19bj6#