ElasticSearchPython API:通过查询删除文档

nwlls2ji  于 2022-11-02  发布在  ElasticSearch
关注(0)|答案(5)|浏览(159)

我看到下面的API将在Elasticsearch -http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html中执行按查询删除
但是我想对ElasticSearch批量API做同样的事情,尽管我可以使用

es.bulk(body=json_batch)

我不确定如何使用python bulk API调用deletebyquery进行ElasticSearch。

mcvgt66p

mcvgt66p1#

elasticsearch-py批量API允许您通过在每条记录中包含'_op_type': 'delete'来批量删除记录。但是,如果您希望通过查询删除,则仍需要进行两个查询:一个用于获取要删除的记录,另一个用于删除这些记录。
批量执行此操作最简单的方法是使用python模块的scan()帮助器,它 Package 了ElasticSearch Scroll API,因此您不必跟踪_scroll_id。将它与bulk()帮助器一起使用,以替换已弃用的delete_by_query()

from elasticsearch.helpers import bulk, scan

bulk_deletes = []
for result in scan(es,
                   query=es_query_body,  # same as the search() body parameter
                   index=ES_INDEX,
                   doc_type=ES_DOC,
                   _source=False,
                   track_scores=False,
                   scroll='5m'):

    result['_op_type'] = 'delete'
    bulk_deletes.append(result)

bulk(elasticsearch, bulk_deletes)

由于传递了_source=False,因此不会返回文档主体,因此每个结果都非常小。但是,如果您有内存约束,则可以非常容易地对它进行批处理:

BATCH_SIZE = 100000

i = 0
bulk_deletes = []
for result in scan(...):

    if i == BATCH_SIZE:
        bulk(elasticsearch, bulk_deletes)
        bulk_deletes = []
        i = 0

    result['_op_type'] = 'delete'
    bulk_deletes.append(result)

    i += 1

bulk(elasticsearch, bulk_deletes)
f1tvaqid

f1tvaqid2#

看到elasticsearch是如何弃用delete by query API的,我使用绑定创建了this python script来做同样的事情。首先定义一个ES连接:

import elasticsearch
es = elasticsearch.Elasticsearch(['localhost'])

现在,您可以使用它来为要删除的结果创建查询。

search=es.search(
    q='The Query to ES.',
    index="*logstash-*",
    size=10,
    search_type="scan",
    scroll='5m',
)

现在,您可以循环滚动查询。在执行查询的同时生成请求。

while True:
    try: 
      # Git the next page of results. 
      scroll=es.scroll( scroll_id=search['_scroll_id'], scroll='5m', )
    # Since scroll throws an error catch it and break the loop. 
    except elasticsearch.exceptions.NotFoundError: 
      break 
    # We have results initialize the bulk variable. 
    bulk = ""
    for result in scroll['hits']['hits']:
      bulk = bulk + '{ "delete" : { "_index" : "' + str(result['_index']) + '", "_type" : "' + str(result['_type']) + '", "_id" : "' + str(result['_id']) + '" } }\n'
    # Finally do the deleting. 
    es.bulk( body=bulk )

要使用批量API,您需要确保以下两点:
1.已标识要更新的文档。(索引,类型,ID)
1.每个请求都以换行符或/n终止。

3zwtqj6y

3zwtqj6y3#

我现在使用的这个脚本是基于@drs响应的,但是使用bulk()helper是一致的。它可以通过使用chunk_size参数(默认值为500,更多信息请参见straming_bulk())从迭代器创建批量作业。

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk

BULK_SIZE = 1000

def stream_items(es, query):
    for e in scan(es, 
                  query=query, 
                  index=ES_INDEX,
                  doc_type=ES_DOCTYPE, 
                  scroll='1m',
                  _source=False):

        # There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn't exists.
        del e['_score']
        e['_op_type'] = 'delete'
        yield e

es = Elasticsearch(host='localhost')
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)
sauutmhj

sauutmhj4#

虽然在操作上等效于许多其他答案,但我个人认为以下语法更容易理解:

import elasticsearch
from elasticsearch.helpers import bulk

es = elasticsearch.Elasticsearch(['localhost'])

ids = [1,2,3, ...]      # list of ids that will be deleted
index = "foo_index"     # index where the documents are indexed

actions = ({
    "_id": _id,
    "_op_type": "delete"
} for _id in ids)

bulk(client=es, actions=actions, index=index, refresh=True)

# `refresh=True` makes the result immediately available
h4cxqtbf

h4cxqtbf5#

谢谢,这真的很有用!
我有两个建议:
1.当使用scroll获取下一页结果时,es.scroll(scroll_id=search['_scroll_id'])应该是上一次scroll返回的_scroll_id,而不是搜索返回的_scroll_id。Elasticsearch并不是每次都更新scroll ID,特别是对于较小的请求(请参见this discussion),所以这段代码可能有用,但并不简单。
1.清除滚动条是很重要的,因为长时间保持搜索上下文打开是有代价的。清除滚动条API - Elasticsearch API文档它们最终会在超时后关闭,但如果你的磁盘空间不足,它可以让你省去很多麻烦。
一个简单的方法是建立一个滚动ID列表(确保删除重复的!),并在最后清除所有内容。

es.clear_scroll(scroll_id=scroll_id_list)

相关问题