我有我的zookeeper和kafka集群都在本地主机上设置和运行。我有一个生产者和消费者运行形式的命令提示符以及运行良好。现在,当我尝试在python脚本上使用kafka python库时,出现以下错误。
Kafka制作人.py
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
try:
future = producer.send('topic', b'From program')
record_metadata = future.get(timeout=60)
producer.flush()
except KafkaError as exc:
print("Exception during getting assigned partitions - {}".format(exc))
# Decide what to do if produce request failed...
pass
错误:
获取分配的分区时出现异常-kafkatimeouterror:包含1条记录的topicpartition(topic='topic',partition=0)的批已过期:自批创建加上延迟时间后已过30秒
这个主题存在,我已经证实了。甚至我的kafka-consumer.py也可以正常工作,因为我通过kafka-console-producer.bat——代理列表从命令提示符检查了它localhost:9092 --topic 主题。
Kafka-消费者.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('topic',auto_offset_reset='earliest',group_id=None,bootstrap_servers=['localhost:9092'])
for msg in consumer:
print (msg)
暂无答案!
目前还没有任何答案,快来回答吧!