我试图从python中读取kafka,但receive message是none,cli中没有错误。我通过putty使用端口转发到目标主机,然后通过telnet测试端口—这很好。此外,我在debian(wsl)上使用kafkacat,它工作正常!
kafkacat -C -b localhost:9092 -t topic1 -p 0 -o beginning -s avro -r http://localhost:28081
我正在使用pycharm,我的代码在下面的文本中。如何调试?
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition
from confluent_kafka.avro.serializer import SerializerError
topics = ['topic1', 'topic2']
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'smallest',
'schema.registry.url': 'http://localhost:28081',
'api.version.request': True
})
c.subscribe(topics)
tp = TopicPartition(topics[0], 0, 0)
c.assign([tp])
while True:
try:
msg = c.poll(1)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
print('Message None')
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
作为
1条答案
按热度按时间gjmwrych1#
我要做的第一件事是确保使用
kafka-avro-console-consumer
工具。然后在应用程序中,您可以尝试提高日志级别:
您可以在这里看到不同的参数:https://github.com/edenhill/librdkafka/blob/master/configuration.md
但我相信您的问题与分配分区的方式有关。如果你使用
subscribe
分区由集群自动分配给使用者。您可以在订阅时添加回调,您可以看到哪些分区分配给了您的使用者,但您不必自己执行。看到了吗https://docs.confluent.io/3.1.1/clients/confluent-kafka-python/index.html#confluent_kafka.consumer.subscribe