我希望能够阅读一个主题的特定分区中的消息,以及另一个主题中的消息,就像我使用简单的 Consumer
.
self.consumer = AvroConsumer(conf)
parts = [TopicPartition('p_topic', 13),
TopicPartition('p_topic', 14)
self.consumer.assign(parts)
self.consumer.subscribe(['test_topic'])
有些“客户机”在“p\u topic”分区中生成消息,有些(我创建的)在“test\u topic”分区中生成如下消息:
self.p.produce('test_topic', msg)
我无法将这两个与上面显示的代码集成。我在“test\u topic”中生成的消息抛出:
File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/__init__.py", line 115, in poll
decoded_value = self._serializer.decode_message(message.value())
File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/avro/serializer/message_serializer.py", line 214, in decode_message
raise SerializerError("message does not start with magic byte")
SerializerError
如何使用 AvroConsumer
?
1条答案
按热度按时间9njqaruj1#
根据关于“魔法字节”的错误,在这个主题中产生的任何东西都没有完成
AvroProducer
.