loop exit:这个操作将永远阻塞(kafka生产者python在scrapinghub)

jm2pwxwz  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(294)

我试着用Kafka建立一个服务器到服务器的通信。我的目标是使用kafka将数据从报废中心发送到python。
我在公共ip上建立了一个Kafka服务器。这通常是可行的,但是过了一段时间(在发送300或500条消息之后),生产者和消费者有时会失败,并在生产者端出现错误“loop exit:this operation will block forever”,然后在消费者端出现错误“no brokers available”。
通常会有2个生产者和2个消费者分别发送到不同的主题,每个主题只有一个主题分区和一个复制。目前我已经给出了以下配置。
num.network.threads=16 num.io.threads=16 所有其他配置仍为默认配置。
这就是我在制作人那边遇到的错误。,

[kafka.producer.sender] Uncaught error in kafka producer I/O thread Less
    Traceback (most recent call last):
     File "/app/python/lib/python2.7/site-packages/kafka/producer/sender.py", 
line 60, in run
       self.run_once()
     File "/app/python/lib/python2.7/site-packages/kafka/producer/sender.py", 
       line 147, in run_once
       (self._client.send(node_id, request)
     File "/app/python/lib/python2.7/site-packages/kafka/client_async.py",
       line 497, in send
       return self._conns[node_id].send(request)
     File "/app/python/lib/python2.7/site-packages/kafka/conn.py", 
       line 663, in send
       return self._send(request)
     File "/app/python/lib/python2.7/site-packages/kafka/conn.py", 
        line 682, in _send
       sent_bytes = self._sock.send(data[total_sent:])
     File "/app/python/lib/python2.7/site-packages/gevent/socket.py", 
       line 443, in send
       self._wait(self._write_event)
     File "/app/python/lib/python2.7/site-packages/gevent/socket.py", 
       line 300, in _wait
       self.hub.wait(watcher)
     File "/app/python/lib/python2.7/site-packages/gevent/hub.py", 
       line 348, in wait
       result = waiter.get()
     File "/app/python/lib/python2.7/site-packages/gevent/hub.py", 
       line 575, in get
       return self.hub.switch()
     File "/app/python/lib/python2.7/site-packages/gevent/hub.py", 
       line 338, in switch
       return greenlet.switch(self)
    LoopExit: This operation would block forever

如果以后尝试从用户端重新连接,将出现以下错误:

Traceback (most recent call last):
  File "manage.py", line 9, in <module>
    execute_from_command_line(sys.argv)
  ...
  File "/home/ubuntu/project/kafka_handle/consumer.py", 
  line 80, in __init__
    auto_commit_interval_ms=10000, auto_offset_reset=offset_reset)
  File "/home/.../python2.7/site-packages/kafka/consumer/group.py",
  line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics,**self.config)
  File "/home/.../python2.7/site-packages/kafka/client_async.py", 
  line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/home/.../python2.7/site-packages/kafka/client_async.py", 
  line 826, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题