如何使用lasiticserach-dsl-py获取kibana可视化库表结果

ecfdbz9o  于 2022-10-06  发布在  Kibana
关注(0)|答案(1)|浏览(150)

我想要创建一个函数,该函数将主机名列表和字段列表作为参数,并动态地准备一个高效的elasticserach查询,以便为传递的每个主机检索该字段的最后一个值。就像我可以用Kibana做的一样。

在Kibana,这看起来像这样:

Kibana生成的查询:

{
  "aggs": {
    "2": {
      "terms": {
        "field": "host.hostname",
        "order": {
          "_key": "desc"
        },
        "size": 5
      },
      "aggs": {
        "1": {
          "top_hits": {
            "fields": [
              {
                "field": "host.ip"
              }
            ],
            "_source": false,
            "size": 1,
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        },
        "3": {
          "top_hits": {
            "fields": [
              {
                "field": "source.ip"
              }
            ],
            "_source": false,
            "size": 1,
            "sort": [
              {
                "@timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  },
  "size": 0,
  "script_fields": {},
  "stored_fields": [
    "*"
  ],
  "runtime_mappings": {},
  "query": {
    "bool": {
      "must": [],
      "filter": [
        {
          "bool": {
            "should": [
              {
                "bool": {
                  "should": [
                    {
                      "match_phrase": {
                        "host.hostname": "p-hostname-a"
                      }
                    }
                  ],
                  "minimum_should_match": 1
                }
              },
              {
                "bool": {
                  "should": [
                    {
                      "match_phrase": {
                        "host.hostname": "P-hostname-H"
                      }
                    }
                  ],
                  "minimum_should_match": 1
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "range": {
            "@timestamp": {
              "format": "strict_date_optional_time",
              "gte": "2022-09-20T07:57:26.189Z",
              "lte": "2022-09-21T07:57:26.189Z"
            }
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
}

我在python中的做法:

def get_data_from_elastic(self, fields_to_get, set_of_host_hostname):
        s = Search().using(client=self.es_con).index(self.METRICBEAT_INDEX_NAME)
        s = s.filter("range",**{'@timestamp': {'gte': 'now-3600m/m'}})
        set_of_host_hostname = list(set_of_host_hostname)

        set_of_host_hostname = set_of_host_hostname[0:2]

        s.aggs.bucket(name=f"main_bucket", agg_type="terms", field="host.hostname", size=1)
        for field in fields_to_get:
            s.aggs["main_bucket"].metric(name=f"name_{field}", agg_type="top_hits", fields=[{"field":field}])

        s.source(fields=fields_to_get)
        main_q = Q("bool", minimum_should_match=1)
        for host_hostname in set_of_host_hostname:
            q = Q("match",**{"host.hostname":host_hostname})
            main_q.should.append(q)
        s = s.query(main_q)
        # print("nnn")
        pp(s.to_dict())
        print("nnn")
        res = s.execute()
        for hit in res:
            print(hit)
            print(hit.host.ip)

结果:

{    "aggs":{
      "main_bucket":{
         "aggs":{
            "name_host.ip":{
               "top_hits":{
                  "fields":[
                     {
                        "field":"host.ip"
                     }
                  ]
               }
            },
            "name_host.os.type":{
               "top_hits":{
                  "fields":[
                     {
                        "field":"host.os.type"
                     }
                  ]
               }
            }
         },
         "terms":{
            "field":"host.hostname",
            "size":1
         }
      }    },    "query":{
      "bool":{
         "filter":[
            {
               "range":{
                  "@timestamp":{
                     "gte":"now-3600m/m"
                  }
               }
            }
         ],
         "minimum_should_match":1,
         "should":[
            {
               "match":{
                  "host.hostname":"P-hostname-2"
               }
            },
            {
               "match":{
                  "host.hostname":"P-hostname-1"
               }
            }
         ]
      }    } }

<Hit(metricbeat-7.17.1-system-2022.38/AEESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/AUESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/AkESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/A0ESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/BEESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/BUESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/BkESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/B0ESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/CEESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']
<Hit(metricbeat-7.17.1-system-2022.38/CUESYIMB2-liSoZOlAYu): {'agent': {'version': '7.17.1'}, '@timestamp': '2022-09-21T1...}>
['1thesameip0']

我尝试了几个变体,但无法复制kibana查询。有人能指导我如何复制Kibana查询吗?

弹性服务器-DSL的版本为7.4

ryoqjall

ryoqjall1#

我有一个可行的解决方案:

Q_Main应该已添加到.Filter

应该添加常规的“SIZE=0”选项以不返回任何文档

结果中的聚合字段应该已显式检查结果。

存储桶的参数“SIZE”应该是主机数量的大小

应该将参数“_source=FALSE,SIZE=1”添加到指标中,以便只返回一条必填字段的记录。

def get_data_from_elastic(self, fields_to_get, set_of_host_hostname):
    fields_to_get = list(fields_to_get)
    s = Search().using(client=self.es_con).index(self.METRICBEAT_INDEX_NAME)
    s = s.filter("range",**{'@timestamp': {'gte': 'now-24h/h'}})
    set_of_host_hostname = list(set_of_host_hostname)
    set_of_host_hostname = set_of_host_hostname[0:2]
    s.aggs.bucket(name="main_bucket", agg_type="terms", field="host.hostname", size=len(set_of_host_hostname))
    s.update_from_dict({"size":0})
    for field in fields_to_get:
        s.aggs[f"main_bucket"].metric(name=f"name_{field}", agg_type="top_hits", fields=[{"field": field}], _source=False, size=1)
        s.aggs[f"main_bucket"].metric(name=f"name_{field}", agg_type="top_hits", fields=[{"field": field}], _source=False, size=1)

    s = s.source(False)
    main_q = Q("bool", minimum_should_match=1)
    for host_hostname in set_of_host_hostname:
        bool_q = Q("bool", minimum_should_match=1)
        q = Q("match_phrase",**{"host.hostname":host_hostname})
        bool_q.should.append(q)
        main_q.should.append(bool_q)
    s = s.filter(main_q)

    res = s.execute()
    for hit in res.aggregations:
        for bucket in hit["buckets"]:
            pp(bucket.to_dict())

相关问题