elasticsearch 如何导出250亿个Elastic Security事件?

lo8azlld  于 2023-10-17  发布在  ElasticSearch
关注(0)|答案(1)|浏览(141)

我是一名MSSP,负责处理一个有250亿个事件的客户。每个事件都需要备份以确保合规性,我可以通过Python API查询索引并提取每个事件的JSON,但我使用的脚本将花费大量时间。有没有更好的方法来做到这一点?目前,我只是使用一个快速和肮脏的脚本来验证,查询信息,存储它持久比查询一次。如果时间到了,我打算用searchAfter来修改脚本,这最终会被 Package 在一个bash脚本中。此外,searchAfter也不起作用,但如果我沿着这条路线走,我可以稍后修复它。也为代码的混乱道歉。

#!/usr/bin/env python3

from elasticsearch import Elasticsearch
import sys
import time

username = "USER"
password = "REDACTED"

elasticsearchEndpoint = "https://elastic.com:9243"

es = Elasticsearch(
    elasticsearchEndpoint, 
    basic_auth=(username, password), 
    request_timeout=120
)

SAI = "searchAfterIndex.txt"
file_path = "dump.out"
options = ["/rr", "rerun", "rr", "-rr", "--rr", "--rerun", "/rerun", "-rerun"]

query = {
    "bool": {
        "filter": [
            {
                "range": {
                    "@timestamp": {
                        "gte": "now-1y",
                        "lt": "now"
                    }
                }
            }
        ],
        "must": [],
        "must_not": [],
        "should": []
    }
}

def getEvents(searchAfter=None):

    while True:
        if searchAfter is None:
            classVar = es.search(expand_wildcards='all', index="*-abc**", size=10000, sort="@timestamp:asc", query=query) # IDK if I need query=query
        else:
            time.sleep(.025)
            classVar = es.search(expand_wildcards='all', index="*-abc**", size=10000, search_after=searchAfter, sort="@timestamp:asc", query=query)
        
        hits = classVar.get("hits", {}).get("hits", [])

        with open(file_path, "a") as fp:
            for hit in hits:
                fp.write(str(hit))

        # Break condition
        if len(hits) == 10000:
            searchAfter = hits[-1]["sort"]
            with open(SAI, "w") as fp:
                fp.write(str(searchAfter).strip("[]"))
        else:
            break

if __name__ == "__main__":

    if len(sys.argv)>=2 and sys.argv[1] in options:
        try:
            searchAfterIndex = open(SAI, "r").read()
            getEvents(searchAfterIndex)
            print("Done!", file=sys.stderr)
        except Exception as e:
            print(f"An error occurred: {e}", file=sys.stderr)

    try:
        getEvents()
        print("Done!", file=sys.stderr)
    except Exception as e:
        print(f"An error occurred: {e}", file=sys.stderr)

我尝试通过发现选项卡中的共享功能导出,但这经常超时和错误我尝试通过API导出,但这需要很长时间

wqlqzqxt

wqlqzqxt1#

以下是解决此问题的几种方法:
1.你可以通过在PIT上下文中运行脚本并按_shard_doc而不是时间戳排序来提高脚本的性能(顺便说一下,按时间戳排序可能会跳过具有相同时间戳的记录)
1.你可以给予你的脚本,并使用现有的JSON转储工具之一,如elasticdump。
1.不是将所有记录转储为json,而是可以创建整个索引的快照并存储快照。由于这是为了合规性,所以无论如何都没有人会看它,如果他们愿意,他们总是可以再次安装elasticsearch和search these snapshots。它会很慢,但不会像在json转储中爬行那么慢。

相关问题