while True:
messages = [] # Store all messages
crs = [] # Store all consumer records
tpd = consumer.poll(timeout_ms=60000, max_records=1)
[ crs.extend(tp) for tp in tpd.values() ] # List of cr's
[ messages.extend([json.loads(cr.value)]) for cr in crs ]
print messages
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
# dummy poll
consumer.poll()
# go to end of the stream
consumer.seek_to_end()
# start iterate
for message in consumer:
print(message)
consumer.close()
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
messages = consumer.poll()
data = []
for msg in messages:
for value in messages[msg]:
#Add just the values to the list
data.append(value[6])
4条答案
按热度按时间ohtdti5x1#
下面是一种方便的方法,可以将轮询返回的所有消息都放在列表中:
8yparm6h2#
你不必担心
seekToEnd()
到日志的末尾。请记住,你首先需要订阅一个主题,然后才能寻找。而且,订阅是懒惰的。因此,您需要添加一个“虚拟投票”才能进行搜索。
vfwfrxfs3#
谢谢,
真管用!
这是我的代码之一的简化版本:
文档说明poll()方法与迭代器接口不兼容,我猜这就是我在脚本末尾的循环中使用的方法。不过,从最初的测试来看,这段代码似乎工作正常。
使用安全吗?还是我误解了文件?
谢谢
ar5n3qh54#
在回答你的问题时:
我的理解是
consumer.poll()
一本字典被退回。所以,当我想调查信息时,我用了一个循环来浏览字典。我相信你所做的就是用
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
然后用看起来你并不是真的只得到了500个投票结果。您可以通过添加
max_poll_records=5
你的Kafka消费者配置。然后,当您运行代码时,如果打印出的消息超过5条,您就可以知道您没有使用poll功能。希望有帮助!