在使用者重新启动后,kafka python读取最后生成的消息

ruyhziif  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(676)

我正在使用kafka python来使用kafka队列(kafka版本0.10.2.0)中的消息。特别是我用的是Kafka苏美尔类型。如果使用者停止并且在一段时间后重新启动,我想从最新生成的消息重新启动,也就是删除在使用者关闭期间生成的所有消息。我怎样才能做到这一点?
谢谢

ohtdti5x

ohtdti5x1#

下面是一种方便的方法,可以将轮询返回的所有消息都放在列表中:

while True:
  messages = [] # Store all messages
  crs = [] # Store all consumer records
  tpd = consumer.poll(timeout_ms=60000, max_records=1)
  [ crs.extend(tp) for tp in tpd.values() ] # List of cr's
  [ messages.extend([json.loads(cr.value)]) for cr in crs ]
  print messages
8yparm6h

8yparm6h2#

你不必担心 seekToEnd() 到日志的末尾。
请记住,你首先需要订阅一个主题,然后才能寻找。而且,订阅是懒惰的。因此,您需要添加一个“虚拟投票”才能进行搜索。

consumer.subscribe(...)
consumer.poll() // dummy poll
consumer.seekToEnd()

// now enter your regular poll-loop
vfwfrxfs

vfwfrxfs3#

谢谢,
真管用!
这是我的代码之一的简化版本:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)

# dummy poll

consumer.poll()

# go to end of the stream

consumer.seek_to_end()

# start iterate

for message in consumer:
    print(message)

consumer.close()

文档说明poll()方法与迭代器接口不兼容,我猜这就是我在脚本末尾的循环中使用的方法。不过,从最初的测试来看,这段代码似乎工作正常。
使用安全吗?还是我误解了文件?
谢谢

ar5n3qh5

ar5n3qh54#

在回答你的问题时:
我的理解是 consumer.poll() 一本字典被退回。所以,当我想调查信息时,我用了一个循环来浏览字典。

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
    for value in messages[msg]:
       #Add just the values to the list
       data.append(value[6])

我相信你所做的就是用 consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True) 然后用


# start iterate

for message in consumer:
    print(message)

看起来你并不是真的只得到了500个投票结果。您可以通过添加 max_poll_records=5 你的Kafka消费者配置。然后,当您运行代码时,如果打印出的消息超过5条,您就可以知道您没有使用poll功能。
希望有帮助!

相关问题