consumer = topic.get_simple_consumer(consumer_group="example")
partition_offset_pairs = [(p, get_offset_for_partition(p)) for p in consumer.partitions.itervalues()]
# because we passed in a consumer_group the new offsets will be saved in Kafka
consumer.reset_offsets(partition_offsets=partition_offset_pairs)
1条答案
按热度按时间vzgqcmou1#
你需要打电话
reset_offsets()
. 例如:(其中
get_offset_for_partition()
是您定义的函数)。或者对于单个分区主题:相同的
reset_offsets()
方法在上也可用BalancedConsumer
&ManagedBalanceConsumer
也要上课。注意,作为kafka设计的一部分,每个主题分区只能独立地保证消息的顺序。