我正在尝试聚合流以获取窗口流中的用户id计数。流没有密钥,因此需要从值中获取用户id并聚合,然后将该窗口中的活动用户数打印到控制台/api。代码如下:
final KStream<String, avroschema> feeds = builder.stream("input_topic");
final KTable<String, Long> aggregated = feeds
// map the user id as key
.map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
.groupByKey()
.count("state_store");
aggregated.print();
我得到的结果是:
[KSTREAM-AGGREGATE-0000000002]: 123 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 456 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 789 , (1<-null)
我怎样才能只打印下面输出中的计数?
user_count 3
我试着计算如下:
KTable<Windowed<String>, Long> countUsers = feeds
// map the user name as key, because the subsequent counting is performed based on the key
.map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
// count users, using one-minute tumbling windows
.countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L))
但它显示的错误如下。有什么问题吗?
Cannot resolve method 'countByKey(org.apache.kafka.streams.kstream.TimeWindows)'
1条答案
按热度按时间yzuktlbb1#
如果将用户id设置为key,则计算每个用户出现的频率。这个数字显然是
1
.如果要对所有用户进行计数,则需要为所有要计数的记录设置相同的“伪键”。
对于编译错误:它只是错误的代码。阅读文档:https://kafka.apache.org/10/documentation/streams/developer-guide/dsl-api.html#id12
不确定您使用的是什么版本,但是
.countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L))
是在0.11版本中更改的旧api。