kafka streams时间序列聚合

wfveoks0  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(443)

我用Kafka流来处理时间序列数据。一个用例是每小时为每个传感器聚合数据(传感器id是主题中的消息键) test ).
我编写了一个管道,它按键(传感器id)分组,然后每小时计算一次读数。
问题是数据库中有一些重复的消息 test 主题(相同的传感器id和时间戳)。我只想考虑最新的消息。
streams dsl api中有什么东西可以实现这一点吗?

meterDataStream
   .groupByKey()
   .count(
     TimeWindows
       .of(TimeUnit.HOURS.toMillis(1))
       .until(TimeUnit.HOURS.toMillis(1)), 
     "counts")
   .foreach((key, value) => {
     val start = epochMillistoDate(key.window().start())
     val end   = epochMillistoDate(key.window().end())
     logger.info(s"$start - $end\t->$value")
   })
r55awzrz

r55awzrz1#

为此,您需要构建自己的重复数据消除运营商。

meterDateStream
    .transform(/*write your own deduplicator*/)
    .groupByKey()....

重复数据消除器(即, Transformer )必须有一个附加的状态存储,你可能想 checkout 标点符号。查看文档了解更多详细信息:
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-处理器和转换器处理器api集成
https://docs.confluent.io/current/streams/developer-guide/processor-api.html

相关问题