我有一个关于Kafka流时间窗口的问题,有些概念让我很困惑。
我们有一个主题,每天获得1000万个事件,日志保留时间为6天,因此主题总共包含6000万个事件。
实际上,只有当前事件对我们来说是有趣的,其余的我们保留5天只是为了审计的原因。
现在我从中创建了一个ktable,我正在执行loadall操作并遍历事件。正如我之前提到的,实际上我们只对当前事件感兴趣,而不是6000万个事件,所以我在ktable定义中打开了数据窗口。
.windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1))
现在,当我用下面的语句加载all事件时,一切都正常运行。
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
这个问题在一天的早期,它会加载100万个事件,但是稍后会加载1000万个,所以我必须迭代1000万个事件,当我们在批处理模式下工作时,我想我可以进一步优化它,只加载最后一个小时的事件,所以对于相同的ktable配置,我尝试使用下面的语句。
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但令我惊讶的是,这并没有返回任何数据。
有人能解释一下为什么这没有返回任何结果吗,我想我误解了时间窗口的概念。
然后我做了一些进一步的测试,并将我的ktable配置更改为如下。
.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1)).until(TimeUnit.DAYS.toMillis(1)))
现在这个查询按我所希望的那样运行
store().fetchAll(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1), System.currentTimeMillis())
但我不确定我走的是正确的道路。。。
如果我将下面的语句用于最新的ktable配置,那么这是否会从当前日期起为我提供1000万个事件?
store().fetchAll(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1), System.currentTimeMillis())
1条答案
按热度按时间zsbz8rwp1#
在窗口存储上使用交互式查询时,时间范围将应用于窗口开始时间戳。因此,如果您有一个1天的窗口,并从
[now - 1 hour, now)
,您将找不到任何匹配的窗口,因为在此时间范围内没有窗口开始。