如何在ElasticSearch中按日志字符串的特定部分对日志进行分组?

snz8szmq  于 2023-03-29  发布在  ElasticSearch
关注(0)|答案(1)|浏览(164)

我正在ElasticSearch中对我的日志进行分析,并试图找出哪10个用户访问某个端点的次数最多。
我的日志格式是:

API request: [GET] /api/v1/person from user: 4fe5e06a-6h33-4661-a9ab-ee8d82523ca7

我想要一个类似于以下的分组:

user_id, requests_made
4fe5e06a-6h33-4661-a9ab-ee8d82523ca7, 59
6fe4e06a-6h35-4231-a1al-sbfe0018mco0, 52
.....

如果我事先不知道用户ID的**,我如何计算出每个唯一用户ID**存在多少相同的日志?(即,我不知道哪个用户ID将拥有最多的日志,我需要生成该洞察力)
我正在通过API请求进行搜索:

GET _search
{
  "aggs": {
    }
  }
}
insrf1ej

insrf1ej1#

由于您没有提到如何将数据索引到Elasticsearch,并且考虑到您无法更改Map或重新索引,因此给出了以下方法。
你可以使用Elasticsearch中的runtime field来实现你想要的输出。
假设你有message字段存储你的日志,那么你可以使用下面的查询来生成输出:

POST 75838359/_search
{
  "fields": [
    "userid"
  ], 
  "runtime_mappings": {
    "userid": {
      "type": "keyword",
      "script": """
        String userid=grok('%{GREEDYDATA:msg}:%{SPACE}%{GREEDYDATA:userid}').extract(doc["message.keyword"].value)?.userid;
        if (userid != null) emit(userid); 
      """
    }
  },
  "aggs": {
    "userdetails": {
      "terms": {
        "field": "userid",
        "size": 10
      }
    }
  }
}

上述查询将返回以下结果:

"hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "75838359",
        "_id": "ufRCI4cBYGx8nLsbCifL",
        "_score": 1,
        "_source": {
          "message": "API request: [GET] /api/v1/person from user: 4fe5e06a-6h33-4661-a9ab-ee8d82523ca7"
        },
        "fields": {
          "userid": [
            "4fe5e06a-6h33-4661-a9ab-ee8d82523ca7"
          ]
        }
      }
    ]
  },
  "aggregations": {
    "userdetails": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "4fe5e06a-6h33-4661-a9ab-ee8d82523ca7",
          "doc_count": 1
        }
      ]
    }
  }

我建议在索引时使用grok模式创建userid字段,如果您能够重新索引数据,则直接使用该字段进行聚合。

相关问题