使用pykafka读取特定记录

qjp7pelc  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(202)

我想在kafka中存储大文件,使用有关记录的元数据在将来检索它们。
因此,我四处发送包含主题、分区id和偏移量的消息,然后我尝试用以下方式检索文件:

def retrieve_file_from_kafka(topic_name, partition_id, offset):
    client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
    topic = client.topics[bytes(topic_name, "UTF-8")]

    consumer = topic.get_balanced_consumer(
    consumer_group=bytes("file_retrieve" + topic_name + str(partition_id) + str(offset), "UTF-8"))
    consumer.reset_offsets([(topic.partitions[partition_id], offset)])
    return consumer.consume()

但它不起作用,只是打印:

Offset reset for partition 0 to timestamp 8 failed. Setting partition 0's internal counter to 8

这个错误是相当神秘的,它发生在复位偏移量上。当我尝试消费时,这个过程就会被卡住,等待重新平衡的锁定。我做错什么了?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题