我有一些原始数据
{
{
"id":1,
"message":"intercept_log,UDP,0.0.0.0,68,255.255.255.255,67"
},
{
"id":2,
"message":"intercept_log,TCP,172.22.96.4,52085,239.255.255.250,3702,1:"
},
{
"id":3,
"message":"intercept_log,UDP,1.0.0.0,68,255.255.255.255,67"
},
{
"id":4,
"message":"intercept_log,TCP,173.22.96.4,52085,239.255.255.250,3702,1:"
}
}
需求
我想根据消息的消息部分的值对数据进行分组。这样的产值
{
{
"GroupValue":"TCP",
"DocCount":"2"
},
{
"GroupValue":"UDP",
"DocCount":"2"
}
}
试试看
我试过这些代码,但失败了
GET systemevent*/_search
{
"size": 0,
"aggs": {
"tags": {
"terms": {
"field": "message.keyword",
"include": " intercept_log[,,](.*?)[,,].*?"
}
}
},
"track_total_hits": true
}
现在我尝试使用管道来满足这个需求。
“aggs”似乎只对字段进行分组。
有人有更好的主意吗?
链接
术语聚合
更新
我的场景有点特别。我从许多不同的服务器收集日志,然后将日志导入es。因此,消息字段之间有很大的区别。如果直接使用脚本语句进行分组统计,将导致分组失败或分组不准确。我尝试根据条件筛选出一些数据,然后使用脚本对操作代码(注解代码1)进行分组,但此代码无法对正确的结果进行分组。
这是我要添加的场景:
我们的团队使用es分析服务器日志,使用rsyslog将数据转发到服务器中心,然后使用logstash将数据过滤并提取到es。此时,es中有一个名为message的字段,message的值是详细的日志信息。此时,我们需要计算消息中包含一些值的数据。
注解代码1
POST systemevent*/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"match_phrase": {
"message": {
"query": "intercept_log"
}
}
}
]
}
},
"aggs": {
"protocol": {
"terms": {
"script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A'",
"size": 10
}
}
},
"track_total_hits": true
}
注解代码2
POST test2/_search
{
"size": 0,
"aggs": {
"protocol": {
"terms": {
"script": "def values = /.*,.*/.matcher( doc['host.keyword'].value ); if( name.matches() ) {return values.group(1) } else { return 'N/A' }",
"size": 10
}
}
}
}
1条答案
按热度按时间qvtsj1bj1#
解决这个问题的最简单方法是利用
terms
聚合。脚本只需在逗号上拆分,然后取第二个值。使用正则表达式
结果看起来
一个更好、更有效的方法是将
message
使用摄取管道或日志存储将字段转换为新字段。