我不熟悉React式编程和webflux,我正在评估从使用webmvc的servlet堆栈上的spring data elasticsearch应用程序到使用spring webflux的React式堆栈的迁移。
我开发了两个完全相同的简单spring引导应用程序,它们可以使用spring data elasticsearch存储库执行crud操作。
测试是用户索引/将文档保存到elasticsearch。负载测试是500个并发用户,启动时间为20秒,每个用户迭代20次(总共10000个文档)
我原以为netty上的webflux会比tomcat上的mvc表现更好,尤其是在并发用户更多的情况下。但结果恰恰相反。netty的响应时间几乎是tomcat的两倍,我需要在React式客户端中增加maxconnection队列,因为我得到了ReadTimeOutException。那么我做错了什么?
问题:
netty上的webflux不应该能够更好地处理更多并发用户吗?
为什么响应时间如此之高。。。netty的吞吐量更低?
我是否需要以不同的方式配置被动客户端以获得更好的性能?
拥有数百个nio线程的tomcat能处理更多的请求并且比netty事件循环更快吗?
这些应用程序具有以下堆栈:
SpringWebMVC:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
tomcat(spring启动程序tomcat:jar:2.4.2:compile)
resthighlevelclient,用于请求elasticsearch
@Configuration
public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.http.port:9200}")
private int port;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(getHostAndPort())
.build();
return RestClients.create(clientConfiguration).rest();
}
private String getHostAndPort(){
return host +":"+ port;
}
}
控制器:
@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {
PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();
return new ResponseEntity<>(dto, HttpStatus.CREATED);
}
服务:
public PersonDocumentDto indexGeneratedPersonDocument(){
PersonDocument personDocument = personGenerator.generatePersonDoc();
PersonDocumentDto personDocumentDto = new PersonDocumentDto();
try {
personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
LOGGER.debug("Document indexed!");
} catch (Exception e) {
LOGGER.error("Unable to index document!",e);
}
return personDocumentDto;
}
Spring 网络流量:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<reactor-test.version>3.4.2</reactor-test.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
netty(springbootstarterreactor-netty:jar:2.4.2:compile)
reactiveelasticsearchclient,用于请求elasticsearch
@Configuration
public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration {
@Value("${elasticsearch.host:localhost}")
private String host;
@Value("${elasticsearch.http.port:9200}")
private int port;
@Override
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(getHostAndPort())
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
String connectionProviderName = "myConnectionProvider";
int maxConnections = 1000;
HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName, maxConnections));
return webClient
.mutate()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.exchangeStrategies(exchangeStrategies)
.build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
private String getHostAndPort(){
return host +":"+ port;
}
}
处理程序:
public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
return this.service.indexGeneratedPersonDocument()
.flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
.onErrorResume(WebClientRequestException.class, e -> ServerResponse
.badRequest()
.bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));
}
服务:
public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){
return personGenerator.generatePersonDocument()
.flatMap(this.repository::save)
.map(EntityDtoUtil::toDto)
.doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}
mvc响应百分比:500个用户,20次迭代,共10000个文档
webflux响应百分位数:500个用户,20次迭代,共10000个文档
暂无答案!
目前还没有任何答案,快来回答吧!