我调用一个函数,它从kafka producer发送一些数据,但是在它发送之后,我返回一个不返回的响应。代码在返回时被卡住。有人知道发生了什么吗?
我的代码如下:,
def postEvent(eventData):
print("The eventData is...",eventData)
timestamp = datetime.now().__format__("%Y-%m-%d %H:%M:%S")
try:
producer = KafkaProducer(bootstrap_servers=["host:port"])
data = json.dumps(eventData).encode('utf-8')
try:
kafkaResponse = producer.send('streamTest', data)
response ={'time': str(timestamp), 'kafkaResponse':kafkaResponse.get(),
'postResult': 'true'}
print('kafaka response is...', response)
except ConnectionAbortedError:
response ={'time': str(timestamp), 'postResult': 'false'}
except kafka.errors.KafkaTimeoutError:
response ={'time': str(timestamp), 'postResult': 'false'}
print('kafaka response is...', response)
return response
except kafka.errors.NoBrokersAvailable:
response = {'Response':'Kafka Errors... NoBrokersAvailable'}
print('kafaka response ', response)
return response
1条答案
按热度按时间csga3l581#
你的问题不清楚是哪一个
return
声明它挂在那里。我测试了你的代码,它在kafka0.10.0.1代理和
kafka-python
1.3.5.这可能是kafka集群的网络问题,所以您可能挂起的两个地方是:1。
kafkaResponse.get()
当你等待Future
解决2。没有可用的代理,而代理超时。如果您传入多个代理,请记住,它们将需要在每次超时抛出NoBrokersAvailable
错误。