如何使用confluent kakfa python包在Kafka中使用最近5分钟的数据?

rmbxnbpk  于 2023-04-29  发布在  Apache
关注(0)|答案(1)|浏览(133)

我目前能够在Kafka中使用最新的实时数据,但是否有一种方法可以以优化的方式使用每个分区中最后5分钟的数据?
目前的方法是将auto.offset.reset设置为earliest,然后消耗,直到它到达位于5分钟时间戳中的每个部分的偏移结束。但这需要很长时间。
如果有一种方法可以做到这一点,但在相反的顺序,以减少消费时间,这将是非常有帮助的!

0yg35tkg

0yg35tkg1#

confluent_kafka.Consumer.offsets_for_times()函数提供了一种机制来获取TopicPartition对象的最早偏移量,其中时间戳大于或等于以毫秒为单位提供的POSIX时间戳。
您可以在订阅主题时为on_assign事件注册一个回调函数,该函数使用Consumer.offsets_for_times()Consumer.assign()在使用消息之前将分配的分区上的偏移量重置为所需的位置。
例如,你可以这样做:

import datetime
import math
from confluent_kafka import Consumer, TopicPartition

def get_time_offset():
  '''Returns the POSIX epoch representation (in milliseconds) of the datetime 5 minutes prior to being called'''
  delta = datetime.timedelta(minutes=5)  
  now = datetime.datetime.now(datetime.timezone.utc)  # TZ-aware object to simplify POSIX epoch conversion
  prior = now - delta  
  return math.floor(prior.timestamp() * 1000)  # convert seconds to milliseconds for Consumer.offsets_for_times()

def reset_offsets(consumer, partitions):
  '''Resets the offsets of the provided partitions to the first offsets found corresponding to timestamps greater than or equal to 5 minutes ago.'''
  time_offset = get_time_offset()
  search_partitions = [TopicPartition(p.topic, p.partition, time_offset) for p in partitions]  # new TPs with offset= time_offset
  time_offset_partitions = consumer.offsets_for_times(search_partitions)  # find TPs with timestamp of earliest offset >= time_offset
  consumer.assign(time_offset_partitions)  # (re-)set consumer partition assignments and start consuming

topics = ['my-topic-of-interest']

c = Consumer({
  'bootstrap.servers': 'server-fqdn',
  'group.id': 'group-name'
})

c.subscribe(topics, on_assign=reset_offsets)  # reset_offsets() called when partition assignment received after c.poll()

# Process all messages from reset offsets (5 min. ago) to present (and ongoing)
while True:
  try:
    msg = c.poll()  # first call triggers execution of on_assign callback function, resetting offsets
  except RuntimeError as e:
    print("Consumer is closed.")
    break
  # process message and commit...

c.close()

相关问题