我有流事件,其中有用户id。我想计算在一定时间内有多少不同的用户生成一个事件。然而,我是Kafka的初学者,我不能应付这个问题。
1分钟内的示例事件;
{"event_name": "viewProduct", "user_id": "12"}
{"event_name": "viewProductDetails", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "12"}
{"event_name": "viewProduct", "user_id": "23"}
{"event_name": "viewProductComments", "user_id": "32"}
我的代码应该生成有3个活跃用户根据上述事件。
我的方法如下,但是这个解决方案不能消除来自同一个用户的多个事件,并且对同一个用户进行多次计数。
builder.stream("orders") // read from orders toic
.mapValues(v -> { // get user_id via json parser
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree((String) v);
return jsonNode.get("user_id").asText();
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return "";
})
.selectKey((k, v) -> "1") // put same key to every user_id
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1))) // use time windows
.count() // count values
1条答案
按热度按时间fcwjkofz1#
我可能错过了什么,你为什么不这样做:
这将按值对记录进行分组,您以前使用该值填充了记录
user_id
.