我对Kafka和KafkaPython相当陌生。在安装了kafka python之后,我尝试了一个简单的消费代码实现-http://kafka-python.readthedocs.io/en/master/usage.html
我一直在kafka的bin目录中编写消费代码,并尝试从那里运行python代码。但是,我得到以下错误:
回溯(最后一次调用):文件“kafkaconsumer.py”,第4行,在下一个返回类型(self)文件“/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”,第559行,在下一个返回类型(self)文件“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”,第915行,在next return next(self.\iterator)file“/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py”第876行,在self.fetcher:file“/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py”第559行,在next return type(self).next(self)file“/usr/local/lib/python2.7/dist packages/kafka/consumer/fetcher.py”第520行,在next return next(self.\u iterator)file“/usr/local/lib/python2.7/dist packages/kafka/consumer/fetcher.py”第477行,在self.\u unpack\u message\u set(tp,messages):文件“/usr/local/lib/python2.7/dist packages/kafka/consumer/fetcher.py”,第372行,在解压缩Asserthas\u snappy(),'snapy depression unsupported'Assert错误:snapy depression unsupported
这是我一直试图运行的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
因为,我是Kafka的新手,我很难理解我做错了什么。
1条答案
按热度按时间tzdcorbm1#
您似乎缺少python snappy,它是读取snappy格式压缩的数据所必需的。
你需要
snappy
以及snappy-devel
,您可以使用yum、apt-get等安装它,然后重试pip install python-snappy