我想用Kafka MultiProcessConsumer
但我有以下错误。这个错误似乎与python中的多线程有关
这是我正在使用的代码。 simple.py
```
from kafka import SimpleProducer, SimpleClient, SimpleConsumer, MultiProcessConsumer
To consume messages
client = SimpleClient('localhost:9092')
consumer = MultiProcessConsumer(client, "my-group", "testing_topic", num_procs=3)
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: message.decode('utf-8')
print(message)
client.close()
运行上述代码时出错。
$ python simple.py
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/var/users/ec2-user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/managers.py", line 749, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/var/users/ec2-user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/connection.py", line 614, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
暂无答案!
目前还没有任何答案,快来回答吧!