我一直在用pykafka向一个主题传递信息
producer.produce('test')
我想得到最新的消息。我在pykafka github页面上找到了一个解决方案,建议:
client = KafkaClient(hosts="xxxxxxx")
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(
auto_offset_reset=OffsetType.LATEST,
reset_offset_on_start=True)
LAST_N_MESSAGES = 2
offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.iteritems()]
consumer.reset_offsets(offsets)
consumer.consume()
但是,我不太明白这里到底发生了什么,只有在至少有两条消息已经存在的情况下,它才会得到最新的消息。
有没有更可靠的解决方案?
1条答案
按热度按时间5sxhfpxr1#
准确地定义“最新消息”的含义是很重要的。在具有多个分区的kafka主题中,如果不检查消息内容,实际上不可能知道每个分区上的哪个最新消息是全局最新消息。定义何时获取最新消息也很重要-是否立即获取一次?是否从最近的邮件开始消费,然后在邮件添加到主题时继续消费?是否定期只获取最新的n条消息?
上面包含的方法(我为pykafka文档编写的基础)为您提供了每个分区的最后n条消息,供您选择n条。如果只想获取最后一条消息,只需设置
LAST_N_MESSAGES
到1。基本上,配方检查每个分区消耗的最新偏移量,然后将使用者的偏移量重置为LAST_N_MESSAGES
在那之前。当您从这一点开始消费时,您只得到分区的最后n条消息。综上所述,如果你只是想从主题的结尾开始消费,你可以用这个:
开始正常消费。