ElasticSearch:从索引中删除重复项

camsedfj  于 2022-11-22  发布在  ElasticSearch
关注(0)|答案(3)|浏览(577)

我有一个索引有多个重复的条目。它们有不同的id,但其他字段有相同的内容。
例如:

{id: 1, content: 'content1'}
{id: 2, content: 'content1'}
{id: 3, content: 'content2'}
{id: 4, content: 'content2'}

删除重复项后:

{id: 1, content: 'content1'}
{id: 3, content: 'content2'}

是否有一种方法可以删除所有重复项,只保留一个不同的条目,而无需手动比较所有条目?

rsaldnfx

rsaldnfx1#

这可以通过几种方法来实现,下面我将概述两种可能的方法:
1)如果您不介意生成新的_id值并将所有文档重新索引到一个新集合中,那么您可以使用Logstash和fingerprint过滤器来生成唯一的指纹(散列),并在文档被写入新集合时使用该指纹作为文档的_id。由于_id字段必须是唯一的,具有相同指纹的任何文档将被写入相同的_id,并因此被去重复。
2)您可以编写一个自定义脚本来滚动索引。读取每个文档时,您可以从您认为用于定义唯一文档的字段中创建一个散列(在你情况下,content字段)。然后使用此哈希值作为字典中的键与该键相关联的值将是生成该相同哈希值的文档的所有_id的列表。一旦您有了所有的散列和关联的_id列表,您就可以对与每个相同的散列关联的所有_id执行删除操作,只保留一个。注意,第二种方法不需要为了去重复而将文档写入新的索引,因为您将直接从原始索引中删除文档。
我已经写了一篇博文和代码,在下面的URL中演示了这两种方法:https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/
免责声明:我是一个咨询工程师在弹性。

mtb9vblg

mtb9vblg2#

我使用Rails,如果需要,我将使用FORCE=y命令导入内容,该命令将删除并重新索引该索引和类型的所有内容......但不确定您在什么环境中运行ES。我唯一能看到的问题是,您导入的数据源是否(即数据库)有重复的记录。我想我会先看看数据源是否可以修复,如果可行的话,你重新索引一切;否则您可以尝试创建一个自定义导入方法,该方法只为每条记录的一个重复项建立索引。
此外,我知道这不符合删除重复条目的要求,但是您可以简单地自定义搜索,以便只返回一个重复的id,可以通过最近的“时间戳”或索引已删除重复的数据并按内容字段分组--看看this post helps.即使这样仍然会在索引中保留重复的记录,至少不会出现在搜索结果中。
我还发现了这个:Elasticsearch delete duplicates
我试着为你想了很多可能的情况,看看这些选项是否有效,或者至少可以是一个临时修复。

ghhkc1vu

ghhkc1vu3#

下面是我根据Alexander Marquardt答案创建的脚本。

import hashlib
from elasticsearch import Elasticsearch, helpers

ES_HOST = 'localhost:9200'
es = Elasticsearch([ES_HOST])

def scroll_over_all_docs(index_name='squad_docs'):
    dict_of_duplicate_docs = {}

    index_docs_count = es.cat.count(index_name, params={"format": "json"})
    total_docs = int(index_docs_count[0]['count'])
    count = 0
    for hit in helpers.scan(es, index=index_name):
        
        count += 1
        
        text = hit['_source']['text']
        id = hit['_id']
        hashed_text = hashlib.md5(text.encode('utf-8')).digest()
        
        dict_of_duplicate_docs.setdefault(hashed_text,[]).append(id)
        
        if (count % 100 == 0):
            print(f'Progress: {count} / {total_docs}')

        
    return dict_of_duplicate_docs

def delete_duplicates(duplicates, index_name='squad_docs'):

    for hash, ids in duplicates.items():
    
        if len(ids) > 1:
            
            print(f'Number of docs: {len(ids)}. Number of docs to delete: {len(ids) -1}')
            for id in ids:
                if id == ids[0]:
                    continue
                res = es.delete(index=index_name, doc_type= '_doc', id=id)
                id_deleted = res['_id']
                results = res['result']
                print(f'Document id {id_deleted} status: {results}')

            reminder_doc = es.get(index=index_name, doc_type= '_all', id=ids[0])
            print('Reminder Document:')
            print(reminder_doc)     

def main():
    dict_of_duplicate_docs = scroll_over_all_docs()
    delete_duplicates(dict_of_duplicate_docs)

if __name__ == "__main__":
   main()

相关问题