我目前正在使用kafka streams解决方案检索用户浏览会话 SessionWindows
. 我的拓扑结构如下所示:
builder
.stream(...)
.map(... => (newKey, value))
.groupByKey(...)
.windowedBy(SessionWindows.`with`(INACTIVITY_GAP).grace(GRACE))
.aggregate(... into list of events)
.suppress(Suppressed.untilWindowCloses(unbounded()))
这个简单的场景很适合我,但是我需要对抑制逻辑进行一些额外的检查。也就是说,我想强制刷新超过给定大小的所有会话(例如,所有内部事件超过1000个的会话,这些事件都是在非活动间隙内生成的)。我的问题是如何实现这一点?
我知道 .suppress()
方法不接受的任何自定义实现 Suppressed
. 所以我在考虑更换 .suppress()
与 .transform()
按照我的习惯 Transformer
用一个 SessionStore
里面可以做抑制逻辑,也可以应用这些额外的检查。然而,当涉及到向存储添加/删除条目以及自己实现基本的“untilwindowclosed”抑制时,我遇到了一个困难:我可能会执行定期刷新 ProcessorContext.schedule()
但是 SessionStore
不提供遍历所有键的可能性。
这是个好方向吗?有没有其他方法可以为会话添加大小限制?
1条答案
按热度按时间d5vmydt91#
这可能就是你想要的:
org.apache.kafka.streams.kstream.suppressed#untiltimelimit org.apache.kafka.streams.kstream.suppressed.bufferconfig#maxrecords