我试图使用 kafka-python
库和有多个代理产生的数据频率很高,但在kafka消费端,我需要大约5秒的处理时间,所以在处理第一条消息后,我应该得到最新的消息,而不是最后一次提交偏移量后的下一条消息。
我试过设置 enable_auto_commit=False
,和 auto_offset_reset="latest"
我也试过设置随机组id,我也试过设置 group_id = None
. 这样做的唯一效果是,我只在开始时获取最新数据,但之后每个数据都按偏移量的顺序出现,而不是队列的结尾或最新数据。
consumer = KafkaConsumer(bootstrap_servers=kafka_brokers_address,
api_version=(2, 3, 0),
group_id='abcd',
value_deserializer=lambda v:json.loads(v.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset="latest")
consumer_rpnl.assign([TopicPartition('topic', 0)])
c = next(consumer)
## also tried
for c in consumer:
print(c.values)
1条答案
按热度按时间bvjveswy1#
如何从以下位置移到最后一个位置的示例:https://github.com/dpkp/kafka-python/issues/1405