我尝试使用kafka流,使用不同大小的时间窗口来聚合大量数据。
我将缓存大小增加到2GB,但是当我在1小时内设置窗口大小时,我得到了100%的cpu负载,应用程序开始减速。
我的代码如下所示:
val tradeStream = builder.stream<String, Trade>(configuration.topicNamePattern, Consumed.with(Serdes.String(), JsonSerde(Trade::class.java)))
tradeStream
.groupBy(
{ _, trade -> trade.pair },
Serialized.with(JsonSerde(TokensPair::class.java), JsonSerde(Trade::class.java))
)
.windowedBy(TimeWindows.of(windowDuration).advanceBy(windowHop).until(windowDuration))
.aggregate(
{ Ticker(windowDuration) },
{ _, newValue, aggregate -> aggregate.add(newValue) },
Materialized.`as`<TokensPair, Ticker>(storeByPairs)
.withKeySerde(JsonSerde(TokensPair::class.java))
.withValueSerde(JsonSerde(Ticker::class.java))
)
.toStream()
.filter { tokensPair, _ -> filterFinishedWindow(tokensPair.window(), windowHop) }
.map { tokensPair, ticker -> KeyValue(
TickerKey(ticker.tokensPair!!, windowDuration, Timestamp(tokensPair.window().start())),
ticker.calcPrice()
)}
.to(topicName, Produced.with(JsonSerde(TickerKey::class.java), JsonSerde(Ticker::class.java)))
此外,在将聚合的数据发送到kafka主题之前,它们会根据窗口的结束时间进行过滤,以便发送到刚刚完成的主题窗口。
也许有更好的方法来实现这种聚合?
1条答案
按热度按时间alen0pnh1#
如果我们对系统了解得不够多,就很难诊断。
集群中有多少个分区?您正在运行多少个流应用程序?流应用程序是否在同一台计算机上运行?你对有效载荷使用压缩吗?它对较小的间隔有效吗?
希望有帮助。