ElasticSearch中累积基数聚集的另一种解法

tuwxkamq  于 2021-06-14  发布在  ElasticSearch
关注(0)|答案(1)|浏览(380)

我运行的elasticsearch群集无法访问 x-packs 在aws上,但我还是想做一个 cumulative cardinality aggregation 以确定我的网站的每日新用户数。
这个问题有别的解决办法吗?
例如,如何转换:

GET /user_hits/_search
{
  "size": 0,
  "aggs": {
    "users_per_day": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "day"
      },
      "aggs": {
        "distinct_users": {
          "cardinality": {
            "field": "user_id"
          }
        },
        "total_new_users": {
          "cumulative_cardinality": {
            "buckets_path": "distinct_users" 
          }
        }
      }
    }
  }
}

产生同样的结果 cumulative_cardinality ?

hc2pp10m

hc2pp10m1#

累积基数的增加正是因为这个原因——它以前不容易计算。。。
就像elasticsearch中几乎所有的东西一样,有一个脚本可以帮你完成。这是我的看法。
设置索引

PUT user_hits
{
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "user_id": {
        "type": "keyword"
      }
    }
  }
}

一天内添加1个新用户,第二天再添加2个,其中一个不是严格意义上的“新用户”。

POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}

POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}

POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}

使用参数化的开始时间+天数来模拟日期直方图,对用户进行相应的分组,然后将日期结果与-à-可见光

GET /user_hits/_search
{
  "size": 0,
  "query": {
    "range": {
      "timestamp": {
        "gte": "2020-10-01"
      }
    }
  }, 
  "aggs": {
    "new_users_count_vs_prev_day": {
      "scripted_metric": {
        "init_script": """
          state.by_day_map = [:];
          state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
          state.day_millis = 24 * 60 * 60 * 1000;
          state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
        """,
        "map_script": """
          for (def step = 1; step < params.num_of_days + 1; step++) {
            def timestamp = doc.timestamp.value.millis;
            def user_id = doc['user_id'].value;
            def anchor = state.start_millis + (step * state.day_millis);
            // add a `n__` prefix to more easily sort the resulting map later on
            def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));

            if (timestamp <= anchor) {
              if (state.by_day_map.containsKey(anchor_pretty)) {
                state.by_day_map[anchor_pretty].add(user_id);
              } else {
                state.by_day_map[anchor_pretty] = [user_id];
              }
            }
        }
        """,
        "combine_script": """
            List keys=new ArrayList(state.by_day_map.keySet());
            Collections.sort(keys);

            def unique_sorted_map = new TreeMap();
            def unique_from_prev_day = [];

            for (def key : keys) { 
              def unique_users_per_day = new HashSet(state.by_day_map.get(key));

              unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));

               // remove the `n__` prefix
               unique_sorted_map.put(key.substring(3), unique_users_per_day.size());
               unique_from_prev_day.addAll(unique_users_per_day);
            }
            return unique_sorted_map
        """,
        "reduce_script": "return states",
        "params": {
          "start_date": "2020-10-01",
          "num_of_days": 5
        }
      }
    }
  }
}

顺从的

"aggregations" : {
  "new_users_count_vs_prev_day" : {
    "value" : [
      {
        "2020-10-01" : 1,    <-- 1 new unique user            
        "2020-10-02" : 1,    <-- another new unique user
        "2020-10-03" : 0,
        "2020-10-04" : 0,
        "2020-10-05" : 0
      }
    ]
  }
}

这个脚本速度肯定很慢,但有一个潜在的非常有用的优点——您可以调整它以返回新用户ID的完整列表,而不仅仅是从累积基数中获得的计数,根据它的实现作者的说法,累积基数在设计上只能以顺序、累积的方式工作。

相关问题