如何使用elasticsearch按@timestamp对分页日志进行排序?

trnvg8h3  于 2021-06-13  发布在  ElasticSearch
关注(0)|答案(1)|浏览(572)

我的目标是根据elasticsearch接收到的时间戳对数百万个日志进行排序。
日志示例:

{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}

不幸的是,我不能得到所有的日志整理出弹性。看来我得自己做了。
我尝试过的方法是将弹性数据整理出来:

es = Search(index="somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
    print(hit['@timestamp'])

另一种方法:

notifications = (es
    .query("range",**{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
        }
    })
    .sort("@timestamp")
    .scan()
)

因此,我正在寻找一种方法来排序这些日志自己或直接通过elasticsearch。目前,我把所有的数据都保存在一个本地的logs.json中,似乎我必须自己对它进行迭代和排序。

n3h0vuf2

n3h0vuf21#

一定要让elasticsearch进行排序,然后将已排序的数据返回给您。
问题是你正在使用 .scan() ,它使用elasticsearch的扫描/滚动api。不幸的是,这只应用于每个页/片上的排序参数,而不是整个搜索结果。这在elasticsearch dsl关于分页的文档中有所说明:

分页

...
如果要访问与查询匹配的所有文档,可以使用使用scan/scroll elasticsearch api的扫描方法:

for hit in s.scan():
    print(hit.title)

注意,在这种情况下,结果不会被排序。
(我的重点)
使用分页是有意义的,当你有一个“数百万的日志”如你所说,所以不使用分页不是一个选择。你可以用 search_after 分页api:

搜索

你可以用 search_after 参数使用上一页中的一组排序值检索下一页的点击。
...
要获取结果的第一页,请提交一个带有 sort 争论。
...
搜索响应包括 sort 每次命中的值。
...
要获得下一页的结果,请使用最后一次点击的结果重新运行上一次搜索 sort 值作为 search_after 论点。。。搜索正在进行中 query 以及 sort 参数必须保持不变。如果提供,则 from 参数必须是 0 (默认)或 -1 .
...
您可以重复此过程以获得额外的结果页。
(省略了原始json请求,因为下面我将用python显示一个示例)
下面是一个如何使用elasticsearch dsl for python的示例。注意我限制了 fields 以及结果的数量,以便于测试。这里的重要部分是 sort 以及 extra(search_after=) .

search = Search(using=client, index='some-index')

# The main query

search = search.extra(size=100)
search = search.query('range',**{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
    '@timestamp': {
        'order': 'desc'
    },
})

# An iterator would be more efficient

hits = []

# Get the 1st page

results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')

# Get the next pages

# Real use-case condition should be "until total" or "until no more results.hits"

while len(hits) < 1000:  
    print(f'Now have {len(hits)}')
    last_hit_sort_id = hits[-1].meta.sort[0]
    search = search.extra(search_after=[last_hit_sort_id])
    results = search.execute()
    hits.extend(results.hits)

with open('results.txt', 'w') as out:
    for hit in hits:
        out.write(f'{hit["@timestamp"]}\n')

这将导致已排序的数据:


# 1st 10 lines

2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
...

# 250-260

2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z 
...

# 675-685

2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
...

# 840-850

2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
...

# Last 3

2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z

使用时有一些注意事项 search_after 如api文档中所述:
使用时间点或坑参数
如果在这些请求之间发生刷新,则结果的顺序可能会更改,从而导致跨页面的结果不一致。为了防止出现这种情况,可以创建一个时间点(pit),以便在搜索中保留当前索引状态。
你需要先做一个post请求来获得一个pit id
然后添加一个
extra 'pit': {'id':x, 'keep_alive':5m} 每个请求的参数
确保使用上次响应中的pit id
使用决胜局
我们建议您在排序中包含一个tiebreaker字段。此tiebreaker字段应包含每个文档的唯一值。如果不包含tiebreaker字段,则分页结果可能会丢失或重复命中。
这将取决于您的文档架构


# Add some ID as a tiebreaker to the `sort` call

search = search.sort(
    {'@timestamp': {
        'order': 'desc'
    }},
    {'some.id': {
        'order': 'desc'
    }}
)

# Include both the sort ID and the some.ID in `search_after`

last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])

相关问题