我有一个kafka主题,就是接收二进制数据(原始数据包捕获数据)。我可以确认它确实是使用kafka cli工具登陆数据的。我每秒收到多条消息。
kafka-console-consumer.sh --zookeeper svr:2181 --topic test
但是当我使用kafka python时,我无法检索任何消息。这个 poll
方法只是不返回任何结果。
(Pdb) consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
(Pdb) consumer.poll(5000)
{}
我已经能够使用kafkapython从只包含文本字符串的单独主题中提取消息。
我很好奇,是否kafka python内部删除了这些消息,因为它们是二进制的,并且没有通过某种验证。我怎样才能更深入地了解为什么无法检索任何消息?
1条答案
按热度按时间yyyllmsg1#
问题是发送到主题的数据使用了snappy压缩。我所要做的就是安装一个额外的模块来处理snappy。
不幸的是,使用我在问题中概述的代码,它只是不返回任何数据,而不是告诉我问题与压缩有关。
作为比较,我使用了旧的consumerapi,它确实正确地报告了问题,并引导我找到了这个解决方案。