从kafka读取日志消息并从我的reactive spring应用程序存储到elastic search中,得到以下两个错误:
1. reactor.retry.RetryExhaustedException:
org.springframework.web.client.HttpServerErrorException:
503 POST request to /hotels-transactional-logs-local-2020.11.17/_doc
returned error code 503.
2. reactor.retry.RetryExhaustedException:
io.netty.handler.timeout.ReadTimeoutException
我的索引设置:
{
"logs-dev-2020.11.17": {
"settings": {
"index": {
"highlight": {
"max_analyzed_offset": "5000000"
},
"number_of_shards": "3",
"provided_name": "logs-dev-2020.11.17",
"creation_date": "1604558592095",
"number_of_replicas": "2",
"uuid": "wjIOSfZOSLyBFTt1cT-whQ",
"version": {
"created": "7020199"
}
}
}
}
}
在spring webclient中保存方法:
public Mono<Log> saveDataIntoIndex(Log esLog, String index) {
return reactiveElasticsearchOperations.save(esLog, index, "_doc")
.publishOn(Schedulers.newElastic("es", 5))
.retryWhen(
Retry.anyOf(Exception.class)
.randomBackoff(Duration.ofSeconds(5), Duration.ofMinutes(1))
.retryMax(2)
);
}
reactiveelasticsearchoperations创建代码:
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoints.split(","))
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
@Bean
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
return new SimpleElasticsearchMappingContext();
}
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
}
其他信息:
ElasticSearch7.2.1版
集群健康状况良好,它们是集群中的3个节点
索引将每天创建,每个索引有3个碎片
它是在进行负荷试验时发生的。
How do we control this?
Somewhere I need to slow down the read and write? If so, how can I do?
我已尝试增加读取超时秒数,但问题仍然存在
带锁定超时(持续时间秒(100))
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoints.split(","))
.withSocketTimeout(Duration.ofSeconds(100)) //the read timeout, max amount of time the client should wait while receiving no data from the server and the response is incomplete
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
我浏览了很多网站。没有解决办法。请帮忙。
暂无答案!
目前还没有任何答案,快来回答吧!