我尝试使用Confluent Kafka在python中编写一个Kafka消费者。我可以获得所有新消息,但如果我杀死并重新启动消费者,则不会获得任何旧消息
def confluent_kafka_consumer(app):
with app.app_context():
import config
app.logger.info('Running Confluent Kafka consumer')
consumer_config = {
'bootstrap.servers': F'{config.Config.KAFKA_BROKER_URL}:{config.Config.KAFKA_BROKER_PORT}',
'group.id': 'myGroupId',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'false',
'max.poll.interval.ms': '86400000'
}
try:
consumer = Consumer(consumer_config)
consumer.subscribe(['updates'])
while True:
# read single message at a time
msg = consumer.poll(0)
if msg is None:
gevent.sleep(config.DevelopmentConfig.KAFKA_CONSUMER_THREAD_SLEEP_TIME)
continue
if msg.error():
print("Error reading message : {}".format(msg.error()))
continue
# You can parse message and save to data base here
callstr = msg.value().decode('utf-8')
print(callstr)
except Exception as ex:
print("Kafka Exception : {}", ex)
finally:
print("closing consumer")
consumer.close()
我试着把groupId设置成不同的东西,重新启动生产者,zookeper...
1条答案
按热度按时间7cwmlq891#
你第一次运行代码的时候得到了现有的数据吗?如果是的话,这就是消费者群体的工作方式;它们在重新启动时保持该位置。
您将需要一个新的group.id,或者可以在外部使用
kafka-consumer-groups
CLI命令(您需要下载Kafka)来重置偏移。或者,生产者代码可能只是丢失了数据。例如,你在使用
kafka-console-consumer
时看到了你所期望的数据吗?重启任何东西都无济于事。