如何在Python中使用asyncio从elasticsearch中获取响应?

pes8fvy9  于 2023-03-29  发布在  ElasticSearch
关注(0)|答案(1)|浏览(122)

我尝试使用asyncio从elasticsearch中获取响应。我将查询收集到一个列表中的elastic,然后我迭代该列表,为每个查询做出响应。
这是我的代码:

async def make_response(query, es):

    res = await es.search(index='index_name', query=query, size=10000)
    
    return res

query_list = make_queries_list_db()
async def main():

    list_of_res = []
    for enum, el in enumerate(query_list):
        es = AsyncElasticsearch(port, auth)
        list_of_res.append(make_response(query_list[enum],es))
    await asyncio.gather(*list_of_res)

asyncio.run(main())


这样做,我仍然有错误,如“未关闭的客户端会话”或“连接时间超过”。如何使用asyncio与elasticsearch以正确的方式?

klsxnrf1

klsxnrf11#

所以我遇到了完全相同的问题,并设法从pypi项目https://pypi.org/project/elasticsearch-async/中拼凑出了答案
我的原始代码看起来像这样

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan

es = AsyncElasticsearch(
    host="localhost",
    port=9200,
    use_ssl=True,
    verify_certs=False,
    ssl_show_warn=False,
    http_auth=("user", "password"),
)

async def main():
    async for doc in async_scan(
        client=es, query={"query": {"match": {"_id": "baa"}}}, index="foo"
    ):
        print(doc)

loop = asyncio.new_event_loop()
loop.run_until_complete(main())

为了解决这个问题,我简单地在代码末尾添加了以下内容

loop.run_until_complete(es.transport.close())
loop.close()

这并不完全是你的代码,但希望能帮助你指出你需要的方向。

相关问题