实现是用python实现的。使用合流Kafka。
我有一个consumer对象来轮询来自kafka主题的消息。消息用于其他大对象的进一步处理,由于大小的原因,我无法在每次消息处理之后备份对象。
我定期转储对象,然后手动提交使用者。下面是我实现的示例代码。
from confluent_kafka import Consumer, KafkaError, TopicPartition
c = Consumer({
'bootstrap.servers': 'myserver',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])
offsets = {}
for i in range(10):
msg = c.poll()
if msg.error():
continue
par = msg.partition()
off = msg.offset()
offsets[p] = off
c.commit(async=False)
print(offsets)
当我第二次运行这个代码时,我希望消息偏移量(如果来自同一个分区)应该是下一个偏移量,即+1,来自打印的上一个偏移量。
但是补偿费提高了很多。还有几百个。
我还尝试手动分配如下位置:
lst_part = []
for par, off in offsets.items():
lst_part.append(TopicPartition('mytopic', par, off))
c.assign(lst_part)
# then start polling messages
新轮询的消息不是指定的偏移量+1。
1条答案
按热度按时间z31licg01#
c.commit(async=False)
将提交客户端已将消息返回给应用程序的所有已使用分区poll()
打电话。如果希望对提交的偏移量进行更细粒度的控制,可以传递显式
[TopicPartition(..)]
列表到commit()
(确保提交最后一条消息\u offset+1)或禁用auto.offset.store
并显式调用store_offsets()
为将来存储的消息/偏移量commit()
打电话。请注意
store_offsets()
仅在master上可用,在合流kafkapython客户机的发布版本中还不可用,但很快就会提供。