我的目标是根据elasticsearch接收到的时间戳对数百万个日志进行排序。
日志示例:
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}
不幸的是,我不能得到所有的日志整理出弹性。看来我得自己做了。
我尝试过的方法是将弹性数据整理出来:
es = Search(index="somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
print(hit['@timestamp'])
另一种方法:
notifications = (es
.query("range",**{
"@timestamp": {
'gte': 'now-48h',
'lt' : 'now'
}
})
.sort("@timestamp")
.scan()
)
因此,我正在寻找一种方法来排序这些日志自己或直接通过elasticsearch。目前,我把所有的数据都保存在一个本地的logs.json中,似乎我必须自己对它进行迭代和排序。
1条答案
按热度按时间n3h0vuf21#
一定要让elasticsearch进行排序,然后将已排序的数据返回给您。
问题是你正在使用
.scan()
,它使用elasticsearch的扫描/滚动api。不幸的是,这只应用于每个页/片上的排序参数,而不是整个搜索结果。这在elasticsearch dsl关于分页的文档中有所说明:分页
...
如果要访问与查询匹配的所有文档,可以使用使用scan/scroll elasticsearch api的扫描方法:
注意,在这种情况下,结果不会被排序。
(我的重点)
使用分页是有意义的,当你有一个“数百万的日志”如你所说,所以不使用分页不是一个选择。你可以用
search_after
分页api:搜索
你可以用
search_after
参数使用上一页中的一组排序值检索下一页的点击。...
要获取结果的第一页,请提交一个带有
sort
争论。...
搜索响应包括
sort
每次命中的值。...
要获得下一页的结果,请使用最后一次点击的结果重新运行上一次搜索
sort
值作为search_after
论点。。。搜索正在进行中query
以及sort
参数必须保持不变。如果提供,则from
参数必须是0
(默认)或-1
....
您可以重复此过程以获得额外的结果页。
(省略了原始json请求,因为下面我将用python显示一个示例)
下面是一个如何使用elasticsearch dsl for python的示例。注意我限制了
fields
以及结果的数量,以便于测试。这里的重要部分是sort
以及extra(search_after=)
.这将导致已排序的数据:
使用时有一些注意事项
search_after
如api文档中所述:使用时间点或坑参数
如果在这些请求之间发生刷新,则结果的顺序可能会更改,从而导致跨页面的结果不一致。为了防止出现这种情况,可以创建一个时间点(pit),以便在搜索中保留当前索引状态。
你需要先做一个post请求来获得一个pit id
然后添加一个
extra
'pit': {'id':x, 'keep_alive':5m}
每个请求的参数确保使用上次响应中的pit id
使用决胜局
我们建议您在排序中包含一个tiebreaker字段。此tiebreaker字段应包含每个文档的唯一值。如果不包含tiebreaker字段,则分页结果可能会丢失或重复命中。
这将取决于您的文档架构