我想写一个API在收到请求后返回202 Accepted。然后在那之后,它应该每秒发送n个Post请求,持续halt-time秒。这是我在这个问题上的尝试。
@PostMapping(value = "/stress")
@ResponseStatus(HttpStatus.ACCEPTED)
private Mono<?> stressPostLog(@RequestBody StressTest request) throws InterruptedException {
return Mono.fromCallable(() -> {
sendBatchRequest(request);
return null;
}).subscribeOn(Schedulers.boundedElastic());
}
private void sendBatchRequest(StressTest requestParams) throws InterruptedException {
successfulRequest.set(0);
long haltTimeMillis = requestParams.getHalt_time() * 1000;
log.info("Halt Time Millis = " + haltTimeMillis );
Integer count = 0;
long initTime = System.currentTimeMillis();
long currTime;
do {
Thread.sleep(1000); //! Sleep time is subjected to system
for(int i = 0; i < requestParams.getRequest_per_second(); i++)
{
AuditSchema request = AuditSchema.generateRandomValue(requestParams.getRequest_size());
Mono<?> response = postAndIncrement(request);
response.publishOn(Schedulers.boundedElastic()).subscribe();
count++;
}
currTime = System.currentTimeMillis();
log.info("Time duration = " + ( currTime - initTime));
}while((currTime - initTime) < haltTimeMillis);
log.info("Request Send = {} and Response Succeed = {}", count, successfulRequest);
}
private Mono<?> postAndIncrement(AuditSchema request) {
return this.client.post()
.uri("v1/log")
.body(BodyInserters.fromValue(request))
.accept(MediaType.APPLICATION_JSON)
.exchangeToMono(clientResponse -> {
log.info("Request suceeded {}", clientResponse.statusCode());
successfulRequest.getAndIncrement();
return Mono.empty();
});
}
字符串
但是,调用不会在5s内执行,并且在5s后会因错误而终止
java.lang.IllegalStateException: Timeout on blocking read for 5000000000 NANOSECONDS
型
如何使用spring webflux编写此代码。
2条答案
按热度按时间ndasle7k1#
使用
fromCallable
并不意味着Callable
会异步执行。请改用fromFuture
:字符串
如果需要,可以为
runAsync
提供一个Executor
(或ExecutorService
),而不是使用公共的fork-join池。sr4lhrrt2#
要解决HTTP 202的问题,请修改方法以返回
void
,并让异步处理在后台进行。