《Kafka的消费者》中使用celery 是否正确?

6tqwzwtp  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(179)

我正在Kafka Consumer中执行一些长时间运行的任务。我想知道我做的是对还是错?我正在使用Kafka Consumer使用来自另一个服务器的消息,并且消息正在按照我的意愿进行处理。我正在将接收到的消息放入Celery队列。一切都运行良好。Here是否需要Celery?或者Kafka将其作为队列系统来处理?

_consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=['{}:{}'.format(HOST, PORT)],auto_offset_reset="earliest", value_deserializer=lambda x: ReadHelper().json_deserializer(x), group_id="mygroupZ1")
    
    for msg in _consumer:

        payload = msg.value

        print("data fetched payload------------------")
        long_running_task.delay(payload) # Does here need Celery task to put in?
e3bfsja2

e3bfsja21#

Kafka消费者需要定期轮询Kafka。如果你运行一些阻塞函数,那么它最终会停止消费者并重新平衡消费者组,然后可能会重新启动,除非另一个消费者接管。因此,如果队列在内存中是本地的,它不能保证消费者会处理它刚刚消费的数据。换句话说,如果发生了重新平衡,你可能会处理重复的事件。
否则,如果您推到一个不同的队列,延迟该事件,然后不等待,您将冒着使该队列不堪重负的风险,并且您需要实现反压力,例如_consumer.pause(),并等待队列耗尽,然后从Kafka恢复轮询。您也无法管理Kafka记录偏移,例如确定队列进程失败,因此可能会丢失数据,因为它只是“发射后就不管”

相关问题