- 此问题已在此处有答案**:
How do I move to a specific offset in a Kafka consumer without running into a ValueError?(1个答案)
12天前关闭
我使用的是python 3.9.16和kafka-python 2.0.2版本。我的MacBook Pro iOS 11.6.5。
我还在接触Kafka的作品,所以我完全有可能做错了事情。
我试图做的是测试寻求与我的消费者抵消的情况下,有些东西没有得到处理,我必须回去重新阅读一条消息。
不管怎样,我一直遇到这个错误消息。我甚至不确定为什么会发生这种情况,因为有时我可以处理偏移量并且它工作正常,其他时候,它会给我这样的消息:
ValueError: Error encountered when attempting to convert value: b'' to struct format: '<built-in method unpack of _struct.Struct object at 0x10bb669f0>', hit error: unpack requires a buffer of 4 bytes
当它工作时,我可以在pdb中看到这一点,这有点证明了这些值存在于主题中供我使用:
(Pdb)
> /Users/username/kafka/tkCons.py(41)<module>()
-> print ("{}, {}".format(blah.offset, blah.value))
(Pdb)
10, b'{"number": 10}'
> /Users/username/kafka/tkCons.py(40)<module>()
-> for blah in consumer:
(Pdb)
我希望我能缩小我在测试过程中所做的事情,但我不能确定我添加/注解的代码行有助于使它工作或使它给我上面的错误。既然我不能100%确定引擎盖下发生了什么,我在寻找周围的东西不知何故影响了zookeeper?我该怎么做才能让引擎盖下的东西开心呢?这是我的代码以防万一。
from kafka import KafkaConsumer, TopicPartition
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer(#'my-topic333', 'my-topic222', 'my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
myTP = TopicPartition('my-topic333', 0)
import pdb
pdb.set_trace()
consumer.assign([myTP])
print ("this is the consumer assignment: {}".format(consumer.assignment()))
print ("before this is my position: {} ".format(consumer.position(myTP)))
consumer.seek(myTP, 50)
#consumer.seek_to_beginning()
print ("after seeking this is my position: {} ".format(consumer.position(myTP)))
for blah in consumer:
print ("{}, {}".format(blah.offset, blah.value))
1条答案
按热度按时间zpqajqem1#
首先,
blah.value
可以是None
,但它应该打印None,然后,不会引发与反序列化相关的ValueError...您需要显示完整的堆栈跟踪,并将偏移量打印在与值不同的单独一行上,以便您可以看到错误实际发生的位置,或者查看包含先前成功偏移量的任何日志...以防有什么东西没处理好我得回去重新读一遍
我不建议使用seek。
相反,让一个失败的处理器引发一个致命的异常,并停止你的Python进程。然后,手动处理任何成功处理的数据的偏移提交(或者你愿意容忍重复的一批偏移,假设你以幂等的方式处理它们),这意味着也禁用自动偏移提交。然后,当您执行此操作并重新启动消费者组时,应用程序将在最后一次成功处理的偏移后自动拾取,并且不需要手动查找。
我是不是在找什么东西影响了zookeeper?
API不使用使用者组,所以除非您还使用订阅和提交函数。