我想在flink上使用elastic producer,但在身份验证方面遇到了一些问题:我的elastic search集群前面有nginx,在nginx中使用basic auth。
但使用ElasticSearch连接器,我无法在url中添加基本身份验证(因为inetsocketaddress)
你有没有想过用ElasticSearchConnector和basic auth?
谢谢你的时间。
这是我的密码:
val configur = new java.util.HashMap[String, String]
configur.put("cluster.name", "cluster")
configur.put("bulk.flush.max.actions", "1000")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("cluster.com"), 9300))
jsonOutput.filter(_.nonEmpty).addSink(new ElasticsearchSink(configur,
transportAddresses,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val jsonMap = parse(element).values.asInstanceOf[java.util.HashMap[String, String]]
return Requests.indexRequest()
.index("flinkTest")
.source(jsonMap);
}
override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
indexer.add(createIndexRequest(element))
}
}))
2条答案
按热度按时间u5rb5r591#
flink使用elasticsearch传输客户端,该客户端在端口9300上使用二进制协议进行连接。nginx代理位于端口9200上的http接口前面。
flink不会使用你的代理,所以不需要提供身份验证。
igsr9ssn2#
如果您需要使用http客户机将flink与elasticsearch连接起来,一种解决方案是使用jest库。
您必须创建一个自定义函数,如以下基本java类:
请注意,还可以使用批量操作来提高速度。
本文的“将flink连接到amazonrs”部分描述了flink的jest实现示例