我有一个扩展flask应用程序的psycopg ConnectionPool
:
def init_pool(cfg, connection_class, name) -> ConnectionPool:
pool = ConnectionPool(
conninfo=f"postgresql://{cfg['user']}:{cfg['password']}@{cfg['host']}/{cfg['database']}",
min_size=cfg['min_pool_size'],
max_size=cfg['max_pool_size'],
connection_class=connection_class,
kwargs={"autocommit": True, "options": "-c idle_session_timeout=0"},
name=name
)
pool.wait(timeout=30.0)
with pool.connection() as conn:
# retrieve a connection to register the user-defined types
# in the global scope of the application (*default*)
register_composite_types(conn)
return pool
def register_composite_types(conn):
""" Convert postgresql type -> python
* Register the shared composite types in the conn scope.
* Utilizes the built-in psycopg factory call
"""
with conn.cursor() as cur:
for (t,) in cur.execute("select shared_types()"):
if not conn.broken:
info = CompositeInfo.fetch(conn, t)
register_composite(info, context=None)
else:
raise LevelsDbException(project_id=None, filename=None,
message="Failed to initialize: Broken connection")
我使用了一个自定义任务类来注入flask应用上下文(根据flask+celery的最新文档):
def celery_init(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
# instantiated once for each task. Each task serves multiple task requests.
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(celeryconfig)
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
我有一个shared_task
,它通过flask代理current_app
使用池:
with current_app.inspection_db_pool.connection() as conn:
field_data = db.get_file_fields( conn
, project_id
, path
, hashstr
)
# field_data (a Cursor returned from `conn.execute`)
fields: list[dict] = fields_from_levels_db(field_data)
# ...
def fields_from_levels_db(file_fields: Iterable) -> list[dict]:
""" file_fields is a Cursor """
return [ dict(
idx = field.field_idx,
purpose = field.purpose,
...
levels = field.levels
) for (field,) in file_fields ]
shared_task
函数在flask应用程序中按预期工作。然而,作为一个使用FlaskTask
配置的celery worker,我得到了以下错误(field_idx
应该是一个dict键):
AttributeError: 'str' object has no attribute 'field_idx'
我的故障排除工作包括通过self.flask_app.inspection_pool
引用池。我也犯了同样的错误。
一般来说,这种类型的错误发生在postgresql用户类型没有注册和/或在当前范围内可用时。我是否正确地注册了相关作用域中的用户类型?
1条答案
按热度按时间z3yyvxxp1#
在celery上下文中,依赖全局作用域来维护初始化的适配器是失败的。
为了确保池中的每个连接都有对适配器的有效引用,我使用了
ConnectionPool
configure
设置。该设置是一个回调,它在使连接在池中可用之前改变连接。回调包括使用context=conn
对psycopg.types.composite.register_composite
的调用。另一种看起来很有前途的方法是利用从
Connection
继承的可选类的使用。这是一条死胡同。我包括一个自定义类,只是为了将池与应用程序中的其他池区分开来。