如何在Elasticsearch中对记录event_type进行计数?

yhxst69z  于 2023-03-01  发布在  ElasticSearch
关注(0)|答案(1)|浏览(149)

在下面的示例响应中,我有4个hits,并且一个用户使用不同的event_type来了两次。我想使用event_type来计算唯一user_id的最后一个数据。

{
  "hits": [
    {
      "_index": "index_test",
      "_type": "_doc",
      "_id": "0yVSN4YBbqa8KnE1E9FS",
      "_score": null,
      "_source": {
        "event_type": "1",
        "user_id": "11777"
        "event_date": "2023-02-20 07:24:28"
      },
      "sort": [1675965370212]
    },
    {
      "_index": "index_test",
      "_type": "_doc",
      "_id": "mXpSN4YBLFzGpeA-E4VI",
      "_score": null,
      "_source": {
        "event_type": "1",
        "user_id": "11677"
        "event_date": "2023-02-20 08:15:28"
      },
      "sort": [1675965370207]
    },
    {
      "_index": "index_test",
      "_type": "_doc",
      "_id": "mnpSN4YBLFzGpeA-E4VM",
      "_score": null,
      "_source": {
        "event_type": "2",
        "user_id": "11777"
        "event_date": "2023-02-20 08:22:28"
      },
      "sort": [1675965370210]
    },
    {
      "_index": "index_test",
      "_type": "_doc",
      "_id": "mHpSN4YBLFzGpeA-E4VD",
      "_score": null,
      "_source": {
        "event_type": "7",
        "user_id": "11293"
        "event_date": "2023-02-20 08:27:28"
      },
      "sort": [1675965370202]
    }
  ]
}

我试过top_hits,它是最后一个用户数据。但这里有一些问题top_hits长度是10000,我的数据超过50000。所以所有的结果都没有出来。
第二,我想只计数数据与事件类型明智的。所以,不需要的数据。请帮助我如何才能做到这一点与ElasticSearch查询。
在上面的结果中,我希望输出如下:

{
  "aggregations": {
    "last_activities": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": 1,
          "doc_count": 2,
          "unique_user": {
            "value": 1
          }
        },
        {
          "key": 2,
          "doc_count": 1,
          "unique_user": {
            "value": 1
          }
        },
        {
          "key": 7,
          "doc_count": 1,
          "unique_user": {
            "value": 1
          }
        }
      ]
    }
  }
}
  • event_type : 1字段中有两个记录,但user_id 11777的最后一个event_type2,因此它不会计入我的聚合 *
    聚合查询:
{
  "aggs": {
    "last_activities": {
      "terms": {
        "field": "user_id",
        "size": 10000
      },
      "aggs": {
        "data": {
          "top_hits": {
            "size": 1,
            "_source": [
              "user_id",
              "event_type"
            ],
            "sort": {
              "created_date": "desc"
            }
          }
        }
      }
    }
  }
}
rkkpypqq

rkkpypqq1#

这不能用常规的聚合来实现,但是可以用script_metric aggregation来实现,它允许你实现你自己的逻辑。下面是我的尝试。代码有一部分是注解的,但是应该很简单。

POST test-events/_search
{
  "aggs": {
    "last_activities": {
      "scripted_metric": {
        "init_script": "state.docs = []",
        "map_script": "state.docs.add(new HashMap(params['_source']))",
        "combine_script": "return state.docs",
        "reduce_script": """
          def users = new HashMap(); 
          def eventTypes = new HashMap();

          for (state in states) { 
            for (d in state) { 
              def eventKey = d.event_type.toString();
              // 1. check if the event type has not been seen yet
              if (!eventTypes.containsKey(eventKey)) {
                // new event type bucket
                eventTypes[eventKey] = [
                  'key': d.event_type,
                  'docCount': 1,
                  'users': new HashSet(),
                  'uniqueUsers': 1
                ];
              } else {
                eventTypes[eventKey].docCount++;
              }
              
              // 2a. check if the user has not been seen yet
              if (!users.containsKey(d.user_id)) {
                users[d.user_id] = new HashMap(d);
              }
              // 2b. otherwise check if the event type is more recent
              else {
                if (users[d.user_id].event_type < d.event_type) {
                  def docCount = users[d.user_id].docCount;
                  users[d.user_id] = d;
                }
              }
            }
          }
          
          // add the unique user count
          users.values().stream().forEach(user -> {
            def eventKey = user.event_type.toString();
            eventTypes[eventKey].users.add(user.user_id);
            eventTypes[eventKey].uniqueUsers = eventTypes[eventKey].users.size();
          });

          eventTypes.keySet().stream().forEach(eventKey -> {
            eventTypes[eventKey].remove('users');
          });
          
          return eventTypes;
        """
      }
    }
  }
}

运行该命令得到的结果如您所料:

"aggregations" : {
    "last_activities" : {
      "value" : {
        "1" : {
          "key" : "1",
          "docCount" : 2,
          "uniqueUsers" : 1
        },
        "2" : {
          "key" : "2",
          "docCount" : 1,
          "uniqueUsers" : 1
        },
        "7" : {
          "key" : "7",
          "docCount" : 1,
          "uniqueUsers" : 1
        }
      }
    }
  }

相关问题