希望有人知道这一点或者能给我指出正确的方向。。。
我有一个通过api rest请求创建的数据主题。rest请求中收到的一个字段是记录eventtime的时间戳。这些记录被生成给kafka,eventtime被设置为记录的元数据时间戳。
我还有另一个rules主题,它提供信息,通过向接收的值添加新字段来扩充数据主题记录。
这两个主题都有用于连接的匹配键。
我的目标是使用ProcessorAPI在所有处理阶段保存数据主题的eventtime。请注意,将有多个不同的kstreams应用程序以多种方式/步骤处理/扩充此数据。
好消息是,我看到很多东西表明在使用kafka流时,输入记录时间戳是保留的。
例如:
https://kafka.apache.org/documentation/streams/core-concepts#streams_time
输入记录时间戳和输出记录时间戳在源和接收器主题中是否相同?
同时也在读时间戳提取器:
https://cwiki.apache.org/confluence/display/kafka/faq#faq-如何编写acustomtimestampextractor
更多关于加入的信息:
https://cwiki.apache.org/confluence/display/kafka/kafka+streams+join+semantics
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#kstream-globalktable联接
在许多streams文档中,我看到它提到“输入记录的时间戳将保留到输出记录”,但我不清楚在连接时它究竟是如何工作的。
我的困惑似乎是,当我们加入时,我们有两个不同的输入记录,并产生一个单一的输出记录。
如何确定在联接中使用的多个输入记录之间持久化哪个时间戳?
我已经和同事讨论过了,有如下几种观点
合并的输入记录的最早非负时间戳被持久化。
左输入记录的时间戳是持久的,例如。 leftStream.join(rightStream, ...);
触发联接的输入记录的时间戳(左或右)
它是不确定的,所以使用挂钟时间,除非为生产者指定了时间戳提取器。
其中有些人比其他人有更好的论点,但我需要知道到底发生了什么。。。
如有任何帮助或建议,我们将不胜感激。
1条答案
按热度按时间ykejflvf1#
目前(即Kafka2.0版本)没有使用时间戳的公共契约,并且允许实现使用任何策略。当前实现使用触发连接计算的记录的时间戳。
作为解决方法,您可以通过添加
.valueTransformer()
加入之后。比较https://cwiki.apache.org/confluence/display/kafka/kip-251%3a+allow+timestamp+manipulation+in+processor+api即,您需要在连接之前将原始时间戳嵌入到值负载中,并在连接之后提取它并设置为元数据时间戳。