python-3.x psycopg停止在celery应用程序上下文中进行验证

vkc1a9a2  于 2023-10-21  发布在  Python
关注(0)|答案(1)|浏览(131)

我有一个扩展flask应用程序的psycopg ConnectionPool

def init_pool(cfg, connection_class, name) -> ConnectionPool:

    pool = ConnectionPool(
        conninfo=f"postgresql://{cfg['user']}:{cfg['password']}@{cfg['host']}/{cfg['database']}",
        min_size=cfg['min_pool_size'],
        max_size=cfg['max_pool_size'],
        connection_class=connection_class,
        kwargs={"autocommit": True, "options": "-c idle_session_timeout=0"},
        name=name
    )
    pool.wait(timeout=30.0)

    with pool.connection() as conn:
        # retrieve a connection to register the user-defined types
        # in the global scope of the application (*default*)
        register_composite_types(conn)

    return pool

def register_composite_types(conn):
    """ Convert postgresql type -> python
        * Register the shared composite types in the conn scope.
        * Utilizes the built-in psycopg factory call
    """
    with conn.cursor() as cur:
        for (t,) in cur.execute("select shared_types()"):
            if not conn.broken:
                info = CompositeInfo.fetch(conn, t)
                register_composite(info, context=None)
            else:
                raise LevelsDbException(project_id=None, filename=None,
                                        message="Failed to initialize: Broken connection")

我使用了一个自定义任务类来注入flask应用上下文(根据flask+celery的最新文档):

def celery_init(app: Flask) -> Celery:

    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    # instantiated once for each task.  Each task serves multiple task requests.
    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(celeryconfig)
    celery_app.set_default()
    app.extensions["celery"] = celery_app

    return celery_app

我有一个shared_task,它通过flask代理current_app使用池:

with current_app.inspection_db_pool.connection() as conn:
    field_data = db.get_file_fields( conn
                                   , project_id
                                   , path
                                   , hashstr
                                   )

# field_data (a Cursor returned from `conn.execute`)
fields: list[dict] = fields_from_levels_db(field_data)

# ... 
def fields_from_levels_db(file_fields: Iterable) -> list[dict]:
    """ file_fields is a Cursor """
    return [ dict(
        idx = field.field_idx,
        purpose = field.purpose,
        ...
        levels = field.levels

    ) for (field,) in file_fields ]

shared_task函数在flask应用程序中按预期工作。然而,作为一个使用FlaskTask配置的celery worker,我得到了以下错误(field_idx应该是一个dict键):

AttributeError: 'str' object has no attribute 'field_idx'

我的故障排除工作包括通过self.flask_app.inspection_pool引用池。我也犯了同样的错误。
一般来说,这种类型的错误发生在postgresql用户类型没有注册和/或在当前范围内可用时。我是否正确地注册了相关作用域中的用户类型?

z3yyvxxp

z3yyvxxp1#

在celery上下文中,依赖全局作用域来维护初始化的适配器是失败的。
为了确保池中的每个连接都有对适配器的有效引用,我使用了ConnectionPoolconfigure设置。该设置是一个回调,它在使连接在池中可用之前改变连接。回调包括使用context=connpsycopg.types.composite.register_composite的调用。

def init_inspection_pool(cfg) -> ConnectionPool:
    def configure(conn: psycopg.Connection):
        register_composite_types(conn, context=conn)
        with_features(conn)

    return init_pool(cfg, InspectionConnection, 'inspection_pool',
                     configure=configure)

另一种看起来很有前途的方法是利用从Connection继承的可选类的使用。这是一条死胡同。我包括一个自定义类,只是为了将池与应用程序中的其他池区分开来。

相关问题