我用Kafka流来处理时间序列数据。一个用例是每小时为每个传感器聚合数据(传感器id是主题中的消息键) test
).
我编写了一个管道,它按键(传感器id)分组,然后每小时计算一次读数。
问题是数据库中有一些重复的消息 test
主题(相同的传感器id和时间戳)。我只想考虑最新的消息。
streams dsl api中有什么东西可以实现这一点吗?
meterDataStream
.groupByKey()
.count(
TimeWindows
.of(TimeUnit.HOURS.toMillis(1))
.until(TimeUnit.HOURS.toMillis(1)),
"counts")
.foreach((key, value) => {
val start = epochMillistoDate(key.window().start())
val end = epochMillistoDate(key.window().end())
logger.info(s"$start - $end\t->$value")
})
1条答案
按热度按时间r55awzrz1#
为此,您需要构建自己的重复数据消除运营商。
重复数据消除器(即,
Transformer
)必须有一个附加的状态存储,你可能想 checkout 标点符号。查看文档了解更多详细信息:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-处理器和转换器处理器api集成
https://docs.confluent.io/current/streams/developer-guide/processor-api.html