我正在Kafka Consumer中执行一些长时间运行的任务。我想知道我做的是对还是错?我正在使用Kafka Consumer使用来自另一个服务器的消息,并且消息正在按照我的意愿进行处理。我正在将接收到的消息放入Celery队列。一切都运行良好。Here是否需要Celery?或者Kafka将其作为队列系统来处理?
_consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=['{}:{}'.format(HOST, PORT)],auto_offset_reset="earliest", value_deserializer=lambda x: ReadHelper().json_deserializer(x), group_id="mygroupZ1")
for msg in _consumer:
payload = msg.value
print("data fetched payload------------------")
long_running_task.delay(payload) # Does here need Celery task to put in?
1条答案
按热度按时间e3bfsja21#
Kafka消费者需要定期轮询Kafka。如果你运行一些阻塞函数,那么它最终会停止消费者并重新平衡消费者组,然后可能会重新启动,除非另一个消费者接管。因此,如果队列在内存中是本地的,它不能保证消费者会处理它刚刚消费的数据。换句话说,如果发生了重新平衡,你可能会处理重复的事件。
否则,如果您推到一个不同的队列,延迟该事件,然后不等待,您将冒着使该队列不堪重负的风险,并且您需要实现反压力,例如
_consumer.pause()
,并等待队列耗尽,然后从Kafka恢复轮询。您也无法管理Kafka记录偏移,例如确定队列进程失败,因此可能会丢失数据,因为它只是“发射后就不管”