rabbitmq 如何检测Celery连接故障并切换到故障切换然后返回?

blpfk2vs  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(147)

因此,我们的用例可能超出了Celery所能做的范围,但我想我应该问...

使用案例

我们计划使用托管/托管RabbitMQ集群支持Celery的代理。我们希望确保我们的应用程序没有停机时间(显然),因此我们正在尝试找出当上游集群发生灾难性故障,整个集群不可用时,我们如何处理该事件。
我们的想法是,我们有一个备用的Rabbit集群,当连接断开时,我们可以自动切换Celery使用该连接。
与此同时,Celery正在确定主集群是否已启动并运行,当主集群启动并运行时,所有发布者都将重新连接到主集群,工作者将清空备份集群,当备份集群为空时,将切换回主集群。

问题

我遇到的困难是捕获连接失败,因为它似乎发生在celery 深处,因为异常没有出现在应用程序中。
我可以看到Celery有一个BROKER_FAILOVER_STRATEGY配置属性,它将处理初始交换,但它(似乎)只在发生故障转移时使用,这不适合我们在备份时交换回主服务器的用例。
我也遇到过Celery的“引导步骤”,但这些都是在Celery自己的“Connection”引导步骤之后应用的,这也是抛出异常的地方。
我有一种感觉,考虑到我一直在寻找的限制,这种方法可能不是最好的方法,但是有人知道我将如何覆盖默认的Connection引导步骤或通过不同的方法实现这一点吗?

omqzjyyz

omqzjyyz1#

它很老了,但可能对某些人有用。我用的是FastApi和Celery 5.2。
run_api.py文件:

import uvicorn

if __name__ == "__main__":
    port=8893
    print("Starting API server on port {}".format(port))
    uvicorn.run("endpoints:app", host="localhost", port=port, access_log=False)

endpoints.py 档案:

import threading
import time
import os
from celery import Celery
from fastapi import FastAPI
import itertools   
import random 

# Create object for fastAPI

app = FastAPI()

# Create and onfigure Celery to manage queus

# ----

celery = Celery(__name__)

celery.conf.broker_url = ["redis://localhost:6379"] 
celery.conf.result_backend = "redis://localhost:6379"
celery.conf.task_track_started = True
celery.conf.task_serializer = "pickle"
celery.conf.result_serializer = "pickle"
celery.conf.accept_content = ["pickle"]

def random_failover_strategy(servers):
    # The next line is necessary to work, even you don't use them:
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in itertools.repeat(None):
        # Do whatever action required here to obtain the new url
        # As an example, ra.
        ra = random.randint(0, 100)
        it = [f"redis://localhost:{str(ra)}"]
        celery.conf.result_backend = it[0]
        shuffle(it)
        yield it[0]

celery.conf.broker_failover_strategy = random_failover_strategy

# Start the celery worker. I start it in a separate thread, so fastapi can run in parallel

worker = celery.Worker()

def start_worker():
    worker.start()

ce = threading.Thread(target=start_worker)
ce.start()

# ----

@app.get("/", tags=["root"])
def root():
    return {"message": ""}

@app.post("/test")
def test(num: int):
    task = test_celery.delay(num)
    print(f'task id: {task.id}')
    return {
        "task_id": task.id,
        "task_status": "PENDING"}

@celery.task(name="test_celery", bind=True)
def test_celery(self, num):
    self.update_state(state='PROGRESS')
    print("ENTERED PROCESS", num)
    time.sleep(100)
    print("EXITING PROCESS", num)
    return {'number': num}

@app.get("/result")
def result(id: str):
    task_result = celery.AsyncResult(id)
    if task_result.status == "SUCCESS":
        return {
            "task_status": task_result.status,
            "task_num": task_result.result['number']
            }
    else:
        return {
            "task_status": task_result.status,
            "task_num": None
            }

将两个文件放在同一个文件夹中。运行python3 run_api. py。
好好享受吧!

相关问题