如何在Elasticsearch中聚合聚合结果?

yr9zkbsy  于 2022-11-02  发布在  ElasticSearch
关注(0)|答案(2)|浏览(273)

我想使用Elasticsearch聚合其他聚合的结果。我已经创建了所需的第一个聚合:

es.search(index='stackoverflow', body = {
    "size":0,
    "query": {
        "bool": {
          "filter": {
              "match" : {"type": "Posts"}
          },
          "filter": {
              "match" : {"PostTypeId": "1"}
          }
        }
    },
    "aggs" : {
        "by_user": {
          "terms": {
            "field": "OwnerUserId"
          }
        }
    }
})

此查询获取所有属于 post 类型的问题文档(PostTypeId = 1)。然后,它按 OwnerUserId 进行聚合,计算每个用户的问题帖子数,得到以下结果:

{'took': 0,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': None,
  'hits': []},
 'aggregations': {'by_user': {'doc_count_error_upper_bound': 0,
   'sum_other_doc_count': 31053,
   'buckets': [{'key': '2230', 'doc_count': 223},
    {'key': '', 'doc_count': 177},
    {'key': '38304', 'doc_count': 158},
    {'key': '5997', 'doc_count': 144},
    {'key': '4048', 'doc_count': 130},
    {'key': '25813', 'doc_count': 119},
    {'key': '27826', 'doc_count': 119},
    {'key': '2633', 'doc_count': 115},
    {'key': '19919', 'doc_count': 114},
    {'key': '13938', 'doc_count': 111}]}}}

现在,我想对上一次的结果进行另一次聚合:按 doc_count 进行聚合,我的意思是对问题帖子进行分组和计数。对于前面的结果,我希望得到的结果是:

{'buckets': [{'key': '223', 'doc_count': 1},
    {'key': '177', 'doc_count': 1},
    {'key': '158', 'doc_count': 1},
    {'key': '144', 'doc_count': 1},
    {'key': '130', 'doc_count': 1},
    {'key': '119', 'doc_count': 2},
    {'key': '115', 'doc_count': 1},
    {'key': '114', 'doc_count': 1},
    {'key': '111', 'doc_count': 1}]}
whlutmcx

whlutmcx1#

我可以找到一种方法来聚合聚合的结果(至少是直接聚合)。正如我在Elasticsearch的论坛上读到的那样,这种用例没有被考虑,因为它将是如此低效。
为了解决我的用例,我所做的是利用转换API将第一个聚合存储在一个临时索引中,然后在该索引上执行第二个聚合。
首先,我创建一个转换来执行第一次聚合(按OwnerUserId分组,并计算每个用户发布的问题数):

url = 'http://localhost:9200/_transform/transform_rq1'
headers = {
   'Content-Type': 'application/json'
}
query = {
  "source": {
    "index": "posts",
    "query": {
        "bool": {
          "filter": {
              "match" : {"PostTypeId": "1"}
          }
        }
    }
  },
  "dest": {
    "index": "rq1"
  },
  "pivot": {
    "group_by": {
      "OwnerUserId": {
        "terms": {
          "field": "OwnerUserId"
        }
      }
    },
    "aggregations": {
      "count": {
        "value_count": {
          "field": "OwnerUserId"
        }
      }
    }
  }
}

response = requests.put(url, headers=headers, data=json.dumps(query))

然后,我启动转换以执行它:

url = 'http://localhost:9200/_transform/transform_rq1/_start'
headers = {
   'Content-Type': 'application/json'
}

response = requests.post(url, headers=headers).json()

最后,我对创建的时态索引进行第二次聚合(按每个用户的问题数分组,得到有多少用户发布了多少问题):

response = es.search(index='rq1', body = {
    "size":0,
    "query": {
                "match_all": {}
             },
    "aggs" : {
        "by_num": {
          "terms": {
            "field": "count",
            "order" : { "_key" : "asc" },
            "size": 30000
          }
        }
    }
})

print(response)

如您所见,我用Python编写了这段代码。

yb3bgrhw

yb3bgrhw2#

“脚本化指标聚合”将有助于实现这一点,并从单个查询的响应中获取统计信息。
这种方法的优点是可以包含任何逻辑并检索必要的数据。
https://www.elastic.co/guide/en/elasticsearch/reference/6.4/search-aggregations-metrics-scripted-metric-aggregation.html
希望这将有助于其他人谁仍然在寻找解决同样类型的任务。

相关问题