我需要在一天中一小时一小时地得到Kafka的信息。每一小时我将启动一个作业来使用1小时前生成的消息。e、 例如,如果当前时间是20:12,我将在19:00:00到19:59:59之间使用消息。这意味着我需要在19:00:00开始偏移,在19:59:59结束偏移。我之前使用了simpleconsumer.getoffsetsb,如“0.8.0 simpleconsumer示例”中所示。问题是返回的偏移量与作为参数给定的时间戳不匹配。e、 g.当生成时间戳19:00:00时,我得到时间16:38:00生成的消息。
5条答案
按热度按时间gojuced71#
Kafka1.10确实支持时间戳,尽管使用它来做你想做的事情仍然是一个小挑战。但是如果你知道你想从哪个时间戳开始读,直到你想读为止,那么你就可以轮询消息直到那个时间,并且停止消费。
ojsjcaue2#
正如其他答复所指出的那样,Kafka的旧版本只有一种将时间Map到偏移量的近似方法。然而,自从kafka 0.10.0(2016年5月发布)以来,kafka为每个主题都保留了一个时间索引。这将允许您有效地从时间到精确的偏移量。您可以使用kafkaconsumer#offsetsfortimes方法访问此信息。
kip-33设计讨论页上有关于如何实现基于时间的索引的更多细节。
hmtdttj43#
Kafka消费api方法
getOffsetsByTimes()
可用于此,可从0.10.0或更高版本获得。参见javadoc。kmb7vmvb4#
向您展示代码:
qnzebej05#
在Kafka目前没有办法得到一个偏移量,对应于一个特定的时间戳-这是设计。正如jaykreps的日志文章顶部所描述的,偏移量数字为日志提供了一种时间戳,它与挂钟时间分离。用偏移量作为时间的概念,你就可以知道任何两个系统是否处于一致的状态,只要知道它们读到什么偏移量就行了。从来没有任何关于不同服务器上的不同时钟时间,闰年,白天光节省时间,时区等混淆。它有点不错。。。
现在。。。总之,如果你知道你的服务器在某个时候坏了,那么实际上,你真的很想知道相应的偏移量。你可以靠近。kafka机器上的日志文件是根据它们开始写入的时间命名的,并且存在一个kafka工具(我现在找不到),可以让您知道哪些偏移与这些文件关联。如果你想知道确切的时间戳,那么你必须在发送给Kafka的消息中对时间戳进行编码。