kafka producer客户端python代码不工作

lvmkulzt  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(225)

我有我的zookeeper和kafka集群都在本地主机上设置和运行。我有一个生产者和消费者运行形式的命令提示符以及运行良好。现在,当我尝试在python脚本上使用kafka python库时,出现以下错误。
Kafka制作人.py

from kafka import KafkaProducer

from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

try:
    future = producer.send('topic', b'From program')
    record_metadata = future.get(timeout=60)
    producer.flush()
except KafkaError as exc:
    print("Exception during getting assigned partitions - {}".format(exc))
    # Decide what to do if produce request failed...
    pass

错误:
获取分配的分区时出现异常-kafkatimeouterror:包含1条记录的topicpartition(topic='topic',partition=0)的批已过期:自批创建加上延迟时间后已过30秒
这个主题存在,我已经证实了。甚至我的kafka-consumer.py也可以正常工作,因为我通过kafka-console-producer.bat——代理列表从命令提示符检查了它localhost:9092 --topic 主题。
Kafka-消费者.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('topic',auto_offset_reset='earliest',group_id=None,bootstrap_servers=['localhost:9092'])
for msg in consumer:
    print (msg)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题