如何在flink上使用带ElasticSearch连接器的basicauth

f4t66c6m  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(371)

我想在flink上使用elastic producer,但在身份验证方面遇到了一些问题:我的elastic search集群前面有nginx,在nginx中使用basic auth。
但使用ElasticSearch连接器,我无法在url中添加基本身份验证(因为inetsocketaddress)
你有没有想过用ElasticSearchConnector和basic auth?
谢谢你的时间。
这是我的密码:

val configur = new java.util.HashMap[String, String]

    configur.put("cluster.name", "cluster")

    configur.put("bulk.flush.max.actions", "1000")

    val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300))

    jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur,
                                                                transportAddresses,
                                                                new ElasticsearchSinkFunction[String] {
      def createIndexRequest(element: String): IndexRequest = {

        val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]]

        return Requests.indexRequest()
          .index("flinkTest")
          .source(jsonMap);
      }

      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
        indexer.add(createIndexRequest(element))
      }
    }))
u5rb5r59

u5rb5r591#

flink使用elasticsearch传输客户端,该客户端在端口9300上使用二进制协议进行连接。nginx代理位于端口9200上的http接口前面。
flink不会使用你的代理,所以不需要提供身份验证。

igsr9ssn

igsr9ssn2#

如果您需要使用http客户机将flink与elasticsearch连接起来,一种解决方案是使用jest库。
您必须创建一个自定义函数,如以下基本java类:

package fr.gfi.keenai.streaming.io.sinks.elasticsearch5;

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

    import io.searchbox.client.JestClient;
    import io.searchbox.client.JestClientFactory;
    import io.searchbox.client.config.HttpClientConfig;
    import io.searchbox.core.Index;

    public class ElasticsearchJestSinkFunction<T> extends RichSinkFunction<T> {

        private static final long serialVersionUID = -7831614642918134232L;

        private JestClient client;

        @Override
        public void invoke(T value) throws Exception {

            String document = convertToJsonDocument(value); 

            Index index = new Index.Builder(document).index("YOUR_INDEX_NAME").type("YOUR_DOCUMENT_TYPE").build();
            client.execute(index);

        }

        @Override
        public void open(Configuration parameters) throws Exception {

            // Construct a new Jest client according to configuration via factory
            JestClientFactory factory = new JestClientFactory();
            factory.setHttpClientConfig(new HttpClientConfig.Builder("http://localhost:9200")
                    .multiThreaded(true)
                    // Per default this implementation will create no more than 2 concurrent
                    // connections per given route
                    .defaultMaxTotalConnectionPerRoute(2)
                    // and no more 20 connections in total
                    .maxTotalConnection(20)
                    // Basic username and password authentication
                    .defaultCredentials("YOUR_USER", "YOUR_PASSWORD")
                    .build());
            client = factory.getObject();
        }

        private String convertToJsonDocument(T value) {
            //TODO
            return "{}";
        }

    }

请注意,还可以使用批量操作来提高速度。
本文的“将flink连接到amazonrs”部分描述了flink的jest实现示例

相关问题