为什么我的spring mvc(tomcat nio,resthighlevelclient)在负载测试中的表现优于webflux(netty,reactiveelasticsearchclient)?

niwlg2el  于 2021-10-10  发布在  Java
关注(0)|答案(0)|浏览(252)

我不熟悉React式编程和webflux,我正在评估从使用webmvc的servlet堆栈上的spring data elasticsearch应用程序到使用spring webflux的React式堆栈的迁移。
我开发了两个完全相同的简单spring引导应用程序,它们可以使用spring data elasticsearch存储库执行crud操作。
测试是用户索引/将文档保存到elasticsearch。负载测试是500个并发用户,启动时间为20秒,每个用户迭代20次(总共10000个文档)
我原以为netty上的webflux会比tomcat上的mvc表现更好,尤其是在并发用户更多的情况下。但结果恰恰相反。netty的响应时间几乎是tomcat的两倍,我需要在React式客户端中增加maxconnection队列,因为我得到了ReadTimeOutException。那么我做错了什么?
问题:
netty上的webflux不应该能够更好地处理更多并发用户吗?
为什么响应时间如此之高。。。netty的吞吐量更低?
我是否需要以不同的方式配置被动客户端以获得更好的性能?
拥有数百个nio线程的tomcat能处理更多的请求并且比netty事件循环更快吗?
这些应用程序具有以下堆栈:
SpringWebMVC:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>

tomcat(spring启动程序tomcat:jar:2.4.2:compile)
resthighlevelclient,用于请求elasticsearch

@Configuration
 public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration {

 @Value("${elasticsearch.host:localhost}")
 private String host;

 @Value("${elasticsearch.http.port:9200}")
 private int port;

 @Override
 @Bean
 public RestHighLevelClient elasticsearchClient() {

     final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
             .connectedTo(getHostAndPort())
             .build();

     return RestClients.create(clientConfiguration).rest();
 }

 private String getHostAndPort(){
     return host +":"+ port;
 }
 }

控制器:

@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {

    PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();

    return new ResponseEntity<>(dto, HttpStatus.CREATED);
}

服务:

public PersonDocumentDto indexGeneratedPersonDocument(){

    PersonDocument personDocument = personGenerator.generatePersonDoc();
    PersonDocumentDto personDocumentDto = new PersonDocumentDto();

    try {
        personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
        LOGGER.debug("Document indexed!");
    } catch (Exception e) {
        LOGGER.error("Unable to index document!",e);
    }

    return personDocumentDto;

}

Spring 网络流量:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <reactor-test.version>3.4.2</reactor-test.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>

netty(springbootstarterreactor-netty:jar:2.4.2:compile)
reactiveelasticsearchclient,用于请求elasticsearch

@Configuration
 public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration {

 @Value("${elasticsearch.host:localhost}")
 private String host;

 @Value("${elasticsearch.http.port:9200}")
 private int port;

 @Override
 @Bean
 public ReactiveElasticsearchClient reactiveElasticsearchClient() {
     ClientConfiguration clientConfiguration = ClientConfiguration.builder()
             .connectedTo(getHostAndPort())
             .withWebClientConfigurer(webClient -> {
                 ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                         .codecs(configurer -> configurer.defaultCodecs()
                                 .maxInMemorySize(-1))
                         .build();
                 String connectionProviderName = "myConnectionProvider";
                 int maxConnections = 1000;
                 HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName, maxConnections));

                 return webClient
                         .mutate()
                         .clientConnector(new ReactorClientHttpConnector(httpClient))
                         .exchangeStrategies(exchangeStrategies)
                         .build();
             })
             .build();

     return ReactiveRestClients.create(clientConfiguration);
 }

 private String getHostAndPort(){
     return host +":"+ port;
 }

}
处理程序:

public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
    return this.service.indexGeneratedPersonDocument()
            .flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
            .onErrorResume(WebClientRequestException.class, e -> ServerResponse
                    .badRequest()
                    .bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));

}

服务:

public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){

    return personGenerator.generatePersonDocument()
            .flatMap(this.repository::save)
            .map(EntityDtoUtil::toDto)
            .doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}

mvc响应百分比:500个用户,20次迭代,共10000个文档
webflux响应百分位数:500个用户,20次迭代,共10000个文档

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题