我目前能够在Kafka中使用最新的实时数据,但是否有一种方法可以以优化的方式使用每个分区中最后5分钟的数据?
目前的方法是将auto.offset.reset
设置为earliest
,然后消耗,直到它到达位于5分钟时间戳中的每个部分的偏移结束。但这需要很长时间。
如果有一种方法可以做到这一点,但在相反的顺序,以减少消费时间,这将是非常有帮助的!
我目前能够在Kafka中使用最新的实时数据,但是否有一种方法可以以优化的方式使用每个分区中最后5分钟的数据?
目前的方法是将auto.offset.reset
设置为earliest
,然后消耗,直到它到达位于5分钟时间戳中的每个部分的偏移结束。但这需要很长时间。
如果有一种方法可以做到这一点,但在相反的顺序,以减少消费时间,这将是非常有帮助的!
1条答案
按热度按时间0yg35tkg1#
confluent_kafka.Consumer.offsets_for_times()
函数提供了一种机制来获取TopicPartition
对象的最早偏移量,其中时间戳大于或等于以毫秒为单位提供的POSIX时间戳。您可以在订阅主题时为
on_assign
事件注册一个回调函数,该函数使用Consumer.offsets_for_times()
和Consumer.assign()
在使用消息之前将分配的分区上的偏移量重置为所需的位置。例如,你可以这样做: