如何使用pykafka从主题中获取最新消息?

kmynzznz  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(742)

我一直在用pykafka向一个主题传递信息

producer.produce('test')

我想得到最新的消息。我在pykafka github页面上找到了一个解决方案,建议:

client = KafkaClient(hosts="xxxxxxx")
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(
    auto_offset_reset=OffsetType.LATEST,
    reset_offset_on_start=True)
LAST_N_MESSAGES = 2
offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.iteritems()]
consumer.reset_offsets(offsets)
consumer.consume()

但是,我不太明白这里到底发生了什么,只有在至少有两条消息已经存在的情况下,它才会得到最新的消息。
有没有更可靠的解决方案?

5sxhfpxr

5sxhfpxr1#

准确地定义“最新消息”的含义是很重要的。在具有多个分区的kafka主题中,如果不检查消息内容,实际上不可能知道每个分区上的哪个最新消息是全局最新消息。定义何时获取最新消息也很重要-是否立即获取一次?是否从最近的邮件开始消费,然后在邮件添加到主题时继续消费?是否定期只获取最新的n条消息?
上面包含的方法(我为pykafka文档编写的基础)为您提供了每个分区的最后n条消息,供您选择n条。如果只想获取最后一条消息,只需设置 LAST_N_MESSAGES 到1。基本上,配方检查每个分区消耗的最新偏移量,然后将使用者的偏移量重置为 LAST_N_MESSAGES 在那之前。当您从这一点开始消费时,您只得到分区的最后n条消息。
综上所述,如果你只是想从主题的结尾开始消费,你可以用这个:

consumer = topic.get_simple_consumer(
    auto_offset_reset=OffsetType.LATEST,
    reset_offset_on_start=True)

开始正常消费。

相关问题