我正在运行python3.7中的一个代码,这个库是由
pip3 install kafka
这是密码
import random
import time, calendar
from random import randint
from kafka import KafkaProducer
from kafka import errors
from json import dumps
from time import sleep
def write_data(producer):
data_cnt = 20000
order_id = calendar.timegm(time.gmtime())
max_price = 100000
topic = "payment_msg"
for i in range(data_cnt):
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
rd = random.random()
order_id += 1
pay_amount = max_price * rd
pay_platform = 0 if random.random() < 0.9 else 1
province_id = randint(0, 6)
cur_data = {"createTime": ts, "orderId": order_id, "payAmount": pay_amount, "payPlatform": pay_platform, "provinceId": province_id}
producer.send(topic, value=cur_data)
sleep(0.5)
def create_producer():
print("Connecting to Kafka brokers")
for i in range(0, 6):
try:
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
print("Connected to Kafka")
return producer
except errors.NoBrokersAvailable:
print("Waiting for brokers to become available")
sleep(10)
raise RuntimeError("Failed to connect to brokers within 60 seconds")
if __name__ == '__main__':
producer = create_producer()
write_data(producer)
但是出现了一个错误
Traceback (most recent call last):
File "generate_source_data.py", line 22, in <module>
from kafka import KafkaProducer
File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async
有什么问题?
1条答案
按热度按时间1l5u6lss1#
我从这里找到了答案https://github.com/dpkp/kafka-python/issues/1566.
pip3 uninstall kafka
删除kafka包并通过pip3 install kafka-python