如何在pykafka simpleconsumer中选择起始偏移量?

ndh0cuux  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(393)

在我的kafka cluster single partition主题中,我有一个简单的使用者处理所有传入的消息,如果处理的数据出错,我希望以相同的顺序重新处理来自某个偏移量(而不是开始)的所有消息,以修复不一致性并保持kafka消息的原始有序序列。
有没有办法和皮Kafka在一起?我没弄明白

vzgqcmou

vzgqcmou1#

你需要打电话 reset_offsets() . 例如:

consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]

# because we passed in a consumer_group the new offsets will be saved in Kafka

consumer.reset_offsets(partition_offsets=partition_offset_pairs)

(其中 get_offset_for_partition() 是您定义的函数)。或者对于单个分区主题:


# read from offset 123456

consumer = topic.get_simple_consumer()
partition = topic.partitions[0]
consumer.reset_offsets([(partition, 123456)])

相同的 reset_offsets() 方法在上也可用 BalancedConsumer & ManagedBalanceConsumer 也要上课。
注意,作为kafka设计的一部分,每个主题分区只能独立地保证消息的顺序。

相关问题