pyspark 我可以在spark map方法中查询elasticsearch吗?

x7yiwoj4  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(149)

我可以这样查询来自Spark的elasticsearch:

spark.read.format(
    "es"
).options(
    **{
        "es.index.auto.create": "true",
        'es.resource': index_name,
        'es.nodes.wan.only': 'true',
        'es.nodes': elasticsearch_host,
        'es.port': elasticsearch_port,
        'es.net.http.auth.user': elasticsearch_user,
        'es.net.http.auth.pass': elasticsearch_password,
        'es.query': query
    }
).load()

但是我怎样才能在map方法里面访问es呢?
就像这样:

df.rdd.map(
 lambda x: query_es({"match": {"name": x[1]}})
)
yzuktlbb

yzuktlbb1#

昨天我自己解决了这个问题。解决方法相对简单。

df.rdd.map(
    lambda x: ElasticSearch().search(index=index, query={"match": {"name": x[1]}})
)

是的,只要新建一个ElasticSearch()对象就可以了。如果你在这一步中遇到了障碍,比如Connection Error等。试着设置xpack.security.enabled=false并将协议从https修改为http

相关问题