将活动用户数从窗口kafka流打印到控制台

imzjd6km  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(338)

我正在尝试聚合流以获取窗口流中的用户id计数。流没有密钥,因此需要从值中获取用户id并聚合,然后将该窗口中的活动用户数打印到控制台/api。代码如下:

final KStream<String, avroschema> feeds = builder.stream("input_topic");
final KTable<String, Long> aggregated = feeds
            // map the user id as key
            .map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
            .groupByKey()
            .count("state_store");
aggregated.print();

我得到的结果是:

[KSTREAM-AGGREGATE-0000000002]: 123 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 456 , (1<-null)
[KSTREAM-AGGREGATE-0000000002]: 789 , (1<-null)

我怎样才能只打印下面输出中的计数?

user_count 3

我试着计算如下:

KTable<Windowed<String>, Long> countUsers = feeds
            // map the user name as key, because the subsequent counting is performed based on the key
            .map((key, value) -> new KeyValue<>(value.getUserId().toString(), value))
            // count users, using one-minute tumbling windows
            .countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L))

但它显示的错误如下。有什么问题吗?

Cannot resolve method 'countByKey(org.apache.kafka.streams.kstream.TimeWindows)'
yzuktlbb

yzuktlbb1#

如果将用户id设置为key,则计算每个用户出现的频率。这个数字显然是 1 .
如果要对所有用户进行计数,则需要为所有要计数的记录设置相同的“伪键”。
对于编译错误:它只是错误的代码。阅读文档:https://kafka.apache.org/10/documentation/streams/developer-guide/dsl-api.html#id12
不确定您使用的是什么版本,但是 .countByKey(TimeWindows.of("UserCountWindow", 60 * 1000L)) 是在0.11版本中更改的旧api。

相关问题