我试着用Kafka建立一个服务器到服务器的通信。我的目标是使用kafka将数据从报废中心发送到python。
我在公共ip上建立了一个Kafka服务器。这通常是可行的,但是过了一段时间(在发送300或500条消息之后),生产者和消费者有时会失败,并在生产者端出现错误“loop exit:this operation will block forever”,然后在消费者端出现错误“no brokers available”。
通常会有2个生产者和2个消费者分别发送到不同的主题,每个主题只有一个主题分区和一个复制。目前我已经给出了以下配置。
num.network.threads=16 num.io.threads=16
所有其他配置仍为默认配置。
这就是我在制作人那边遇到的错误。,
[kafka.producer.sender] Uncaught error in kafka producer I/O thread Less
Traceback (most recent call last):
File "/app/python/lib/python2.7/site-packages/kafka/producer/sender.py",
line 60, in run
self.run_once()
File "/app/python/lib/python2.7/site-packages/kafka/producer/sender.py",
line 147, in run_once
(self._client.send(node_id, request)
File "/app/python/lib/python2.7/site-packages/kafka/client_async.py",
line 497, in send
return self._conns[node_id].send(request)
File "/app/python/lib/python2.7/site-packages/kafka/conn.py",
line 663, in send
return self._send(request)
File "/app/python/lib/python2.7/site-packages/kafka/conn.py",
line 682, in _send
sent_bytes = self._sock.send(data[total_sent:])
File "/app/python/lib/python2.7/site-packages/gevent/socket.py",
line 443, in send
self._wait(self._write_event)
File "/app/python/lib/python2.7/site-packages/gevent/socket.py",
line 300, in _wait
self.hub.wait(watcher)
File "/app/python/lib/python2.7/site-packages/gevent/hub.py",
line 348, in wait
result = waiter.get()
File "/app/python/lib/python2.7/site-packages/gevent/hub.py",
line 575, in get
return self.hub.switch()
File "/app/python/lib/python2.7/site-packages/gevent/hub.py",
line 338, in switch
return greenlet.switch(self)
LoopExit: This operation would block forever
如果以后尝试从用户端重新连接,将出现以下错误:
Traceback (most recent call last):
File "manage.py", line 9, in <module>
execute_from_command_line(sys.argv)
...
File "/home/ubuntu/project/kafka_handle/consumer.py",
line 80, in __init__
auto_commit_interval_ms=10000, auto_offset_reset=offset_reset)
File "/home/.../python2.7/site-packages/kafka/consumer/group.py",
line 324, in __init__
self._client = KafkaClient(metrics=self._metrics,**self.config)
File "/home/.../python2.7/site-packages/kafka/client_async.py",
line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/home/.../python2.7/site-packages/kafka/client_async.py",
line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
暂无答案!
目前还没有任何答案,快来回答吧!