Spring Boot +ElasticSearch:与Java REST高级客户端的连接被拒绝

qf9go6mv  于 2022-12-29  发布在  Spring
关注(0)|答案(3)|浏览(520)

我正在为我的spring Boot 项目实现一个弹性池,我也在使用spring boot 2.1.4和ElasticSearch7.3.0。我被困在这个问题上了。当任何API试图查询它时,它会给出java.net.ConnectException: Connection refused。我想使用customizeHttpClient的配置,并设置线程计数。因此,当应用程序启动时,它只建立一个连接,并使用该连接查询数据库,直到bean被销毁。
我尝试使用以下弹性配置

import java.io.IOException;
    
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.apache.http.impl.nio.reactor.IOReactorConfig;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ElasticConfig{
    
        public static String host;
    
        private static String port;
    
        private static String protocol;
    
        private static String username;
    
        private static String password;
    
        private RestHighLevelClient client;
    
        @Value("${dselastic.host}")
        public void setHost(String value) {
            host = value;
        }
    
        @Value("${dselastic.port}")
        public void setPort(String value) {
            port = value;
        }
    
        @Value("${dselastic.protocol}")
        public void setProtocol(String value) {
            protocol = value;
        }
    
        @Value("${dselastic.username}")
        public void setUsername(String value) {
            username = value;
        }
    
        @Value("${dselastic.password}")
        public void setPassword(String value) {
            password = value;
        }
    
        @Bean(destroyMethod = "cleanUp")
        @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON)
        public void prepareConnection() {
            RestClientBuilder restBuilder = RestClient.builder(new HttpHost(host, Integer.valueOf(port), protocol));
            if (username != null & password != null) {
                final CredentialsProvider creadential = new BasicCredentialsProvider();
                creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    
                        return httpClientBuilder.setDefaultCredentialsProvider(creadential)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
                    }
                });
                restBuilder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
                        .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
                        .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.
                client = new RestHighLevelClient(restBuilder);
            }
        }
    
        public void cleanUp() {
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

我也尝试过实现DisposableBean接口及其destroy方法,但遇到了相同的异常。
这是我的API,我尝试在其中查询文档:

public class IndexNameController {
    
        @Autowired
        RestHighLevelClient client;
    
        @GetMapping(value = "/listAllNames")
        public ArrayList<Object> listAllNames(HttpSession session) {
            ArrayList<Object> results = new ArrayList<>();
            try {
                SearchRequest searchRequest = new SearchRequest();
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchRequest.indices("indexname");
                String[] fields = { "name", "id" };
                searchSourceBuilder.fetchSource(fields, new String[] {});
                searchSourceBuilder.query(QueryBuilders.matchAllQuery()).size(10000);
                searchSourceBuilder = searchSourceBuilder.sort(new FieldSortBuilder("createdTime").order(SortOrder.DESC));
                searchRequest.source(searchSourceBuilder);
                SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                for (SearchHit searchHit : searchHits) {
                    Map<String, Object> map = new HashMap<>();
                    map.put("value", searchHit.getSourceAsMap().get("id"));
                    map.put("name", searchHit.getSourceAsMap().get("name"));
                    results.add(map);
                }
                return results;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return new ArrayList<>();
        }
    }

当它试图查询时,在client.search()处出现异常。这是堆栈跟踪

java.net.ConnectException: Connection refused
        at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:788)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:218)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:205)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1454)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)
        at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:930)
        at com.incident.response.controller.IncidentController.listAllIncidents(IncidentController.java:569)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:171)
        at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:145)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
        at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
        at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
        ... 1 more

请帮助我摆脱这一点。所有的帮助和建议将不胜感激。

mkshixfv

mkshixfv1#

  • 我研究了很多,如何摆脱这一点,我尝试了许多解决方案后,最后我会得到解决方案,所有的事情都在工作,因为我想要的。
  • 我试图实现一个弹性池,一个客户端使用的整个项目的任何查询或聚合,当bean将销毁将关闭,我的整个查询是通过使用只有一个连接。
  • 我将我的弹性配置更改为:
@Configuration
    public class ElasticConfig {

        @Autowired
        Environment environment;

        private RestHighLevelClient client;

        @Bean
        public RestHighLevelClient prepareConnection() {
            RestClientBuilder restBuilder = RestClient
                    .builder(new HttpHost(environment.getProperty("zselastic.host").toString(),
                            Integer.valueOf(environment.getProperty("zselastic.port").toString()),
                            environment.getProperty("zselastic.protocol").toString()));
            String username = new String(environment.getProperty("zselastic.username").toString());
            String password = new String(environment.getProperty("zselastic.password").toString());
            if (username != null & password != null) {
                final CredentialsProvider creadential = new BasicCredentialsProvider();
                creadential.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                restBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

                        return httpClientBuilder.setDefaultCredentialsProvider(creadential)
                                .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(1).build());
                    }
                });

                restBuilder.setRequestConfigCallback(requestConfigBuilder -> 
                requestConfigBuilder.setConnectTimeout(10000) // time until a connection with the server is established.
                        .setSocketTimeout(60000) // time of inactivity to wait for packets[data] to receive.
                        .setConnectionRequestTimeout(0)); // time to fetch a connection from the connection pool 0 for infinite.

                client = new RestHighLevelClient(restBuilder);
                return client;
            }
            return null;
        }

        /*
         * it gets called when bean instance is getting removed from the context if
         * scope is not a prototype
         */
        /*
         * If there is a method named shutdown or close then spring container will try
         * to automatically configure them as callback methods when bean is being
         * destroyed
         */
        @PreDestroy
        public void clientClose() {
            try {
                this.client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • 所以现在我的bean返回RestHighLevelClient,它将被整个项目使用,我得到了每个API的响应,而不是java.net.ConnectException: Connection refused
  • 另外,我使用这个http://host:port/_nodes/stats/http检查节点统计信息。当我的Sping Boot 应用程序启动时,它将启动一个到elasticsearch的连接,并将一个条目添加到current_open中。在此之后,连接将不会增加,直到我的应用程序运行完所有查询。并且仅通过使用该连接来执行聚集。当我的应用程序将关闭或停止连接将被关闭,并从current_open中删除条目。
  • 所以现在我可以得出结论,我通过使用此配置应用了弹性池
quhf5bfb

quhf5bfb2#

在我的例子中,我忘了在创建Bean的方法中添加@Bean注解,请遵循AbstractElasticsearchConfiguration的实现类。

package org.lauksas.elasticsearch.configuration;

import java.time.Duration;

import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchEntityMapper;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.EntityMapper;
import org.springframework.http.HttpHeaders;

@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {

  @Value("${elasticsearch.host}")
  private String host;
  @Value("${elasticsearch.port}")
  private int port;
  @Value("${elasticsearch.username}")
  private String username;
  @Value("${elasticsearch.password}")
  private String password;

  Logger log = LoggerFactory.getLogger(getClass());

  @Bean
  @Override
  public EntityMapper entityMapper() {
    ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(),
        new DefaultConversionService());
    entityMapper.setConversions(elasticsearchCustomConversions());

    return entityMapper;
  }

  @Override
  @Bean
  public RestHighLevelClient elasticsearchClient() {
    HttpHeaders headers = new HttpHeaders();
    headers.setBasicAuth(username, password);

    final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
        .usingSsl().withBasicAuth(username, password).withSocketTimeout(Duration.ofMinutes(10))
        .build();

    return RestClients.create(clientConfiguration).rest();
  }

  @Bean
  public RestClient restClient() {
    HttpHeaders headers = new HttpHeaders();
    headers.setBasicAuth(username, password);

    final ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo(host + ":" + port)
        .usingSsl().withBasicAuth(username, password).build();

    return RestClients.create(clientConfiguration).lowLevelRest();
  }

  @Bean
  @Primary
  public ElasticsearchOperations elasticsearchTemplate() {
    return elasticsearchOperations();
  }
}
gupuwyp2

gupuwyp23#

对我来说,早些时候我使用RestClientBuilder,后来我开始使用RestHighLevelClient,这帮助我摆脱了超时问题。

相关问题