我在这方面有困难 KafaConsumer
使其从开头读取,或从任何其他显式偏移量读取。
为同一主题的使用者运行命令行工具时,我确实看到了 --from-beginning
选项,否则将挂起
$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginning
如果我通过python运行它,它会挂起,我怀疑这是由不正确的使用者配置引起的
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['localhost:9092'],
group_id=None,
auto_commit_enable=False,
auto_offset_reset='smallest')
print "Consuming messages from the given topic"
for message in consumer:
print "Message", message
if message is not None:
print message.offset, message.value
print "Quit"
输出:
使用来自给定主题的消息(之后挂起)
我使用的是kafkapython0.9.5,代理运行的是kafka8.2。不知道到底是什么问题。
按照dpkp的建议设置\u group \u id=none \u以模拟控制台使用者的行为。
5条答案
按热度按时间uplii1fm1#
自动补偿重置为我解决了。
sd2nnvve2#
我的观点是:打印并确保偏移量是您所期望的。通过使用
position()
以及seek_to_beginning()
,请参见代码中的注解。我无法解释:
为什么示例化之后
KafkaConsumer
,分区没有分配,这是设计的吗?闲逛就是打电话poll()
以前一次seek_to_beginning()
为什么有时候之后呢seek_to_beginning()
,第一次呼叫poll()
不返回任何数据,也不更改偏移量。代码:
输出:
xwmevbvl3#
我遇到了同样的问题:我可以在kafka控制台中接收消息,但是无法使用包获取python脚本的消息
kafka-python
.最后我想原因是我没有打电话
producer.flush()
以及producer.close()
在我的producer.py
文件中没有提到。ar5n3qh54#
auto_offset_reset='earliest'
以及group_id=None
帮我解决了。rta7y2nd5#
控制台使用者和您发布的python使用者代码之间的区别是python使用者使用使用者组来保存偏移:
group_id="test-consumer-group"
. 相反,如果您设置group_id=none,您将看到与控制台使用者相同的行为。