我正在尝试使用gevent在python2.7上使用brod编写kafka 0.7.2。
这是我收到的错误信息。我猜是因为阻塞。brod支持tornado,但我使用gevent。
No handlers could be found for logger "brod.socket"
Traceback (most recent call last):
File "/var/chef/cache/src/gevent/gevent/greenlet.py", line 328, in run
result = self._run(*self.args,**self.kwargs)
File "worker_server.py", line 204, in execute_kafka_pipe
kafka.produce(topic,payload)
File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/base.py", line 287, in produce
return self._write(request, callback)
File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py", line 98, in _write
return self._write(data, callback, retries)
File "/usr/local/lib/python2.7/dist-packages/brod-0.3.2-py2.7.egg/brod/blocking.py", line 89, in _write
wrote_length += self._socket.send(data)
File "/var/chef/cache/src/gevent/gevent/socket.py", line 441, in send
self._wait(self._write_event)
File "/var/chef/cache/src/gevent/gevent/socket.py", line 292, in _wait
assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
AssertionError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x1dece60>>
<Greenlet at 0x1d4b9b0: execute_kafka_pipe('topic-spend', '{"enode": 1, "city": "Cairns", "dl": "en", "wnode)> failed with AssertionError
我试着使用gevent kakfa,但依赖于gevent zookeeper。
当尝试连接到zookeeper时,我收到以下消息:
Traceback (most recent call last):
File "/home/ubuntu/workspace/rtbhui-devops/servers/worker_server.py", line 68, in <module>
framework = gevent_zookeeper.ZookeeperFramework('localhost:2181', 10)
File "/usr/local/lib/python2.7/dist-packages/gevent_zookeeper/framework.py", line 241, in __init__
self.client = ZookeeperClient(hosts, timeout)
File "/usr/local/lib/python2.7/dist-packages/gevent_zookeeper/client.py", line 211, in __init__
self._event = gevent.core.event(
AttributeError: 'module' object has no attribute 'core'
Exception AttributeError: "'ZookeeperClient' object has no attribute '_event'" in <bound method ZookeeperClient.__del__ of <gevent_zookeeper.client.ZookeeperClient object at 0x274ded0>> ignored
有没有一个python库可以让我使用gevent编写消息?
2条答案
按热度按时间wswtfjt71#
你可以试试https://github.com/hmahmood/kafka-python/tree/gevent-impl
目前提交到主线kafka python库进行pr:
https://github.com/mumrah/kafka-python/pull/145
vom3gejh2#
试试皮Kafka
尽管您应该能够通过传入gevent使用的选择器来配置kafkapython