kafka avroconsumer使用偏移量\u从时间戳消费\u次

7ivaypg9  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(376)

尝试使用合流的\u kafka.avroconsumer来使用给定时间戳中的消息。

if flag:

    # creating a list
    topic_partitons_to_search = list(
        map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))

    print("Searching for offsets with %s" % topic_partitons_to_search)
    offsets = c.offsets_for_times(topic_partitons_to_search, timeout=1.0)
    print("offsets_for_times results: %s" % offsets)

    for x in offsets:
        c.seek(x)
    flag=False

控制台返回此

Searching for offsets with [TopicPartition{topic=my_topic2,partition=0,offset=1543584425,error=None}]
offsets_for_times results: [TopicPartition{topic=my_topic2,partition=0,offset=0,error=None}]
{'name': 'Hello'}
{'name': 'Hello'}
{'name': 'Hello1'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Hello3'}
{'name': 'Offset 8'}
{'name': 'Offset 9'}
{'name': 'Offset 10'}
{'name': 'Offset 11'}
{'name': 'New'}

这些是我的主题2的分区0中的所有消息(在分区1中没有任何消息),我们应该不会得到任何消息,因为我们没有从当前时间(time.time())生成的消息。我希望能够使用 time.time() - 60000 在过去60000毫秒内获取所有信息

pgvzfuti

pgvzfuti1#

pythons time.time()返回从epoch开始的秒数,偏移量\u for \u times使用从epoch开始的毫秒数,因此当我发送秒数时,它计算的日期比今天早得多,这意味着我们应该包括我的所有偏移量。

相关问题