python在kafka使用者中的控制消息偏移

rggaifut  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(487)

实现是用python实现的。使用合流Kafka。
我有一个consumer对象来轮询来自kafka主题的消息。消息用于其他大对象的进一步处理,由于大小的原因,我无法在每次消息处理之后备份对象。
我定期转储对象,然后手动提交使用者。下面是我实现的示例代码。

from confluent_kafka import Consumer, KafkaError, TopicPartition

c = Consumer({
    'bootstrap.servers': 'myserver',
    'group.id': 'mygroup',
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])

offsets = {} 

for i in range(10):
    msg = c.poll()

    if msg.error():
        continue

    par = msg.partition()
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets)

当我第二次运行这个代码时,我希望消息偏移量(如果来自同一个分区)应该是下一个偏移量,即+1,来自打印的上一个偏移量。
但是补偿费提高了很多。还有几百个。
我还尝试手动分配如下位置:

lst_part = []

for par, off in offsets.items():
    lst_part.append(TopicPartition('mytopic', par, off))

c.assign(lst_part)

# then start polling messages

新轮询的消息不是指定的偏移量+1。

z31licg0

z31licg01#

c.commit(async=False) 将提交客户端已将消息返回给应用程序的所有已使用分区 poll() 打电话。
如果希望对提交的偏移量进行更细粒度的控制,可以传递显式 [TopicPartition(..)] 列表到 commit() (确保提交最后一条消息\u offset+1)或禁用 auto.offset.store 并显式调用 store_offsets() 为将来存储的消息/偏移量 commit() 打电话。
请注意 store_offsets() 仅在master上可用,在合流kafkapython客户机的发布版本中还不可用,但很快就会提供。

相关问题