在我的情况下,我正在执行 hopping window
例如。 (100sec,1sec)
.
KTable<Windowed<String>, aggrTest> WinMinMax = Records.groupByKey().aggregate(new aggrTestInitilizer(),
new minMaxCalculator()
, TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).advanceBy(TimeUnit.SECONDS.toMillis(1)),aggrMessageSerde);
但是这里100秒窗口中的消息数量非常大,这导致窗口执行需要花费大量时间。所以,为了提高这个窗口的执行时间,
我希望避免将数据写入中间聚合状态存储(kafka流在默认情况下写入)。
另外,如果(1)不可能,那么我们可以将窗口生成的中间聚合状态存储在ram中而不是磁盘中吗?(相同的设置是什么?)
有没有进一步的改进窗口执行时间的建议?
1条答案
按热度按时间gopyfrb31#
不知道你是怎么实现你的目标的
MinMaxCalculator()
但我假设它只是将当前的min/max与新值进行比较。因此,存储只包含当前聚合。-因此,窗口大小根本不重要,因为与窗口大小无关,您只存储键和当前聚合结果。回答您的问题:
按照设计,聚合需要一个存储来保留当前聚合结果——因此,您无法摆脱存储。
是的,你可以在内存存储中使用。这个
aggregate()
方法有一个重载,允许您放置一个自定义存储,并且有一些实用程序类要在内存存储中创建。查看文档(顺便说一句:这些api在kafka 1.0中简化了很多,所以如果您还没有使用1.0,我建议您升级):https://docs.confluent.io/current/streams/developer-guide/processor-api.html#enable-或禁用状态存储更改日志的容错如上所述,窗口的大小不影响计算“速度”
旁注:
如果使用内存状态而不是rocksdb,则会将存储大小限制为ram大小—如果保留时间太长,则可能会出现问题,因为状态可能会变得非常大
如果您进行滚动反弹,这将需要更多的时间,因为需要通过读取完整的changelog主题来重新创建状态——rocksdb存储可以从本地磁盘恢复更快的状态
您可以尝试使用rocksdb并增加ktable缓存大小以提高性能:https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html