我想在我的kafka streams应用程序中使用state store并使用这个非常好的示例来实现重复数据消除:
https://github.com/confluentinc/kafka-streams-examples/blob/5.5.0-post/src/test/java/io/confluent/examples/streams/eventdeduplicationlambdaintegrationtest.java
我对这个例子有几个问题。
正如我正确理解的,这个例子简单地做到了:
消息进入输入主题
查看存储,如果它不存在,则写入状态存储并返回
如果确实存在,请删除记录,以便应用重复数据消除。
但在代码示例中,有一个时间窗口大小可以确定。另外,状态存储中消息的保留时间。您还可以通过给timestamp timefrom+timeto检查记录是否在存储中
final long eventTime = context.timestamp();
final WindowStoreIterator<String> timeIterator = store.fetch(
key,
eventTime - leftDurationMs,
eventTime + rightDurationMs
);
“时间到”和“时间从”的实际用途是什么?我不知道为什么我要检查下一个时间间隔,因为我正在检查尚未进入我的主题的未来消息?
我的第二个问题是这个时间间隔是否相关,是否应该触及上一个时间窗口?
如果我能够通过给timeto和timefrom来搜索时间间隔,为什么时间窗口的大小很重要?
如果我给窗口大小12小时,我能保证我的重复消除邮件12小时吗?
我这样想:
在应用程序启动的第一分钟,第一条消息带有“a”键,11小时后,带有“a”键的消息再次出现。我可以通过提供足够的时间间隔(比如eventtime-12小时)来捕获这个重复的消息吗?
谢谢你的建议!
1条答案
按热度按时间7qhs6swi1#
时间窗口的大小决定了你想要“复制”运行多长时间,永远不复制或者只在5分钟内。Kafka必须保存这些记录。大的时间窗口可能会占用服务器的大量资源。
timefrom和timeto,因为您的记录(事件)可能在Kafka到达/处理得晚,所以记录的事件时间是1分钟前,而不是现在。Kafka的过程是一个“旧”的记录,这就是它需要照顾的记录不是那么旧,相对“未来”的记录,以“旧”之一。