我尝试在kafka中创建新主题时启动dynamic consumer,但是dynamic-launched consumer总是缺少起始/第一条消息,而是从那里开始使用消息。我正在使用kafkapython模块,并且正在使用更新的kafkaconsumer和kafkaproducer。
生产商代码为
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
消费者的代码是
consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
请建议一些东西来解决这个问题或任何配置,我必须包括在我的生产者和消费者的示例。
1条答案
按热度按时间pkbketx91#
你能把自动偏移复位设置为最早吗。
创建新的使用者流时,它从最新偏移开始(这是auto\u offset\u reset的默认值),您将错过在使用者未启动时发送的消息。
你可以在kafka python doc里读到。相关部分如下
自动\u offset \u reset(str)–用于在发生偏移时重置偏移的策略自动中断错误:“最早”将移动到最早的可用消息,“最新”将移动到最新的消息。任何OFR值都会引发异常。默认值:“latest”。