Kafka制作人停止我的代码

f4t66c6m  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(290)

我调用一个函数,它从kafka producer发送一些数据,但是在它发送之后,我返回一个不返回的响应。代码在返回时被卡住。有人知道发生了什么吗?
我的代码如下:,

def postEvent(eventData):
    print("The eventData is...",eventData)
    timestamp = datetime.now().__format__("%Y-%m-%d %H:%M:%S")
    try:
        producer = KafkaProducer(bootstrap_servers=["host:port"])
        data = json.dumps(eventData).encode('utf-8')
        try:

            kafkaResponse = producer.send('streamTest', data)

            response ={'time': str(timestamp), 'kafkaResponse':kafkaResponse.get(), 
                       'postResult': 'true'}
            print('kafaka response is...', response)
        except ConnectionAbortedError:
                response ={'time': str(timestamp), 'postResult': 'false'}
        except kafka.errors.KafkaTimeoutError:
                response ={'time': str(timestamp), 'postResult': 'false'}
        print('kafaka response is...', response)
        return response
    except kafka.errors.NoBrokersAvailable:
        response = {'Response':'Kafka Errors... NoBrokersAvailable'}
        print('kafaka response ', response)
        return response
csga3l58

csga3l581#

你的问题不清楚是哪一个 return 声明它挂在那里。
我测试了你的代码,它在kafka0.10.0.1代理和 kafka-python 1.3.5.
这可能是kafka集群的网络问题,所以您可能挂起的两个地方是:1。 kafkaResponse.get() 当你等待 Future 解决2。没有可用的代理,而代理超时。如果您传入多个代理,请记住,它们将需要在每次超时抛出 NoBrokersAvailable 错误。

相关问题