具有会话大小限制的自定义会话窗口抑制

kmynzznz  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(389)

我目前正在使用kafka streams解决方案检索用户浏览会话 SessionWindows . 我的拓扑结构如下所示:

builder
 .stream(...)
 .map(... => (newKey, value))
 .groupByKey(...)
 .windowedBy(SessionWindows.`with`(INACTIVITY_GAP).grace(GRACE))
 .aggregate(... into list of events)
 .suppress(Suppressed.untilWindowCloses(unbounded()))

这个简单的场景很适合我,但是我需要对抑制逻辑进行一些额外的检查。也就是说,我想强制刷新超过给定大小的所有会话(例如,所有内部事件超过1000个的会话,这些事件都是在非活动间隙内生成的)。我的问题是如何实现这一点?
我知道 .suppress() 方法不接受的任何自定义实现 Suppressed . 所以我在考虑更换 .suppress().transform() 按照我的习惯 Transformer 用一个 SessionStore 里面可以做抑制逻辑,也可以应用这些额外的检查。然而,当涉及到向存储添加/删除条目以及自己实现基本的“untilwindowclosed”抑制时,我遇到了一个困难:我可能会执行定期刷新 ProcessorContext.schedule() 但是 SessionStore 不提供遍历所有键的可能性。
这是个好方向吗?有没有其他方法可以为会话添加大小限制?

d5vmydt9

d5vmydt91#

这可能就是你想要的:
org.apache.kafka.streams.kstream.suppressed#untiltimelimit org.apache.kafka.streams.kstream.suppressed.bufferconfig#maxrecords

.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(WAIT_UNTIL), Suppressed.BufferConfig.maxRecords(1000)))

相关问题