syntaxerror:运行python kafka代码时语法无效

zazmityj  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(526)

我正在运行python3.7中的一个代码,这个库是由

pip3 install kafka

这是密码

import random
import time, calendar
from random import randint
from kafka import KafkaProducer
from kafka import errors
from json import dumps
from time import sleep

def write_data(producer):
    data_cnt = 20000
    order_id = calendar.timegm(time.gmtime())
    max_price = 100000
    topic = "payment_msg"

    for i in range(data_cnt):
        ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        rd = random.random()
        order_id += 1
        pay_amount = max_price * rd
        pay_platform = 0 if random.random() < 0.9 else 1
        province_id = randint(0, 6)
        cur_data = {"createTime": ts, "orderId": order_id, "payAmount": pay_amount, "payPlatform": pay_platform, "provinceId": province_id}
        producer.send(topic, value=cur_data)
        sleep(0.5)

def create_producer():
    print("Connecting to Kafka brokers")
    for i in range(0, 6):
        try:
            producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                            value_serializer=lambda x: dumps(x).encode('utf-8'))
            print("Connected to Kafka")
            return producer
        except errors.NoBrokersAvailable:
            print("Waiting for brokers to become available")
            sleep(10)

    raise RuntimeError("Failed to connect to brokers within 60 seconds")

if __name__ == '__main__':
    producer = create_producer()
    write_data(producer)

但是出现了一个错误

Traceback (most recent call last):
  File "generate_source_data.py", line 22, in <module>
    from kafka import KafkaProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
    from kafka.producer import KafkaProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
    from .simple import SimpleProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
    return '<SimpleProducer batch=%s>' % self.async

有什么问题?

1l5u6lss

1l5u6lss1#

我从这里找到了答案https://github.com/dpkp/kafka-python/issues/1566. pip3 uninstall kafka 删除kafka包并通过 pip3 install kafka-python

相关问题