如何计算在一定时间段内使用kafka流生成事件的用户数?

oaxa6hgo  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我有流事件,其中有用户id。我想计算在一定时间内有多少不同的用户生成一个事件。然而,我是Kafka的初学者,我不能应付这个问题。
1分钟内的示例事件;

{"event_name": "viewProduct", "user_id": "12"}
{"event_name": "viewProductDetails", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "12"}
{"event_name": "viewProduct", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "32"}

我的代码应该生成有3个活跃用户根据上述事件。
我的方法如下,但是这个解决方案不能消除来自同一个用户的多个事件,并且对同一个用户进行多次计数。

builder.stream("orders") // read from orders toic
                .mapValues(v -> { // get user_id via json parser
                    JsonNode jsonNode = null;
                    try {
                        jsonNode = objectMapper.readTree((String) v);
                        return jsonNode.get("user_id").asText();
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                        return "";
                    })
                .selectKey((k, v) -> "1") // put same key to every user_id
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(1))) // use time windows
                .count() // count values
fcwjkofz

fcwjkofz1#

我可能错过了什么,你为什么不这样做:

.selectKey((k, v) -> v)

这将按值对记录进行分组,您以前使用该值填充了记录 user_id .

相关问题