下面是使用kafka流的简单会话窗口:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
使用以下代码,我们可以配置状态存储:
Materialized
.as(Stores.persistentSessionStore(storeName, Duration.ofHours(2))
.withCachingEnabled()
.withLoggingEnabled()
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
文件说明:
请注意,保留期必须至少足够长,以包含窗口数据的整个生命周期,从窗口开始到窗口结束,以及整个宽限期。
我们不适用宽限期。但请考虑以下场景:会话窗口在保留期之前结束,而非活动间隔在保留期之后结束。我想知道,会话数据是否有丢失的可能?清理的力度有多大?
1条答案
按热度按时间vddsk6oq1#
这似乎是时间窗口商店的c&p错误。
比较代码:
https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/sessionwindowedkstreamimpl.java#l186
https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/timewindowedkstreamimpl.java#l166
我创建了一张jira罚单来修复它:https://issues.apache.org/jira/browse/kafka-9068