bounty将于明天到期。回答此问题可获得+200的声望奖励。Cyril N.正在寻找标准答案。
我会尽可能完整地写这一期。
我正在使用Sanic,一个ASGI Python框架,并在此基础上构建了一个数据库管理器。
这个数据库管理器使用ContextVar
在代码中的任何地方给予对当前db
示例的访问。
下面是与数据库相关的代码:
database.py
# -*- coding:utf-8 -*-
from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool, NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sentry_sdk import push_scope, capture_exception
from sanic import Sanic
class EngineNotInitialisedError(Exception):
pass
class DBSessionContext:
def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
self.read_session = read_session
self.write_session = write_session
self.commit_on_exit = commit_on_exit
self.token = None
self._read = None
self._write = None
def _disable_flush(self, *args,**kwargs):
raise NotImplementedError('Unable to flush a read-only session.')
async def close(self, exc_type=None, exc_value=None, traceback=None):
if self._write:
try:
if exc_value and getattr(exc_value, 'status_code', 500) > 300:
await self._write.rollback()
else:
await self._write.commit()
except Exception as e:
pass
try:
await self._write.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
if self._read:
try:
await self._read.close()
except OperationalError as e:
if e.orig.args[0] != 2013: # Lost connection to MySQL server during query
raise e
def set_token(self, token):
self.token = token
@property
def read(self) -> Session:
if not self._read:
self._read = self.read_session()
self._read.flush = self._disable_flush
return self._read
@property
def write(self) -> Session:
if not self._write:
self._write = self.write_session()
return self._write
class AsyncSession(SQLAlchemyAsyncSession):
async def execute(self, statement,**parameters):
return await super().execute(statement, parameters)
async def first(self, statement,**parameters):
executed = await self.execute(statement,**parameters)
return executed.first()
async def all(self, statement,**parameters):
executed = await self.execute(statement,**parameters)
return executed.all()
class DBSession:
def __init__(self):
self.app = None
self.read_engine = None
self.read_session = None
self.write_engine = None
self.write_session = None
self._session = None
self.context = ContextVar("context", default=None)
self.commit_on_exit = True
def init_app(self, app: Surge) -> None:
self.app = app
self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)
self.read_engine = create_async_engine(
self.app.config.get('DATABASE_READ_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**{
'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': QueuePool, # will be used to create a connection pool instance using the connection parameters given in the URL
# if pool_class is not NullPool:
# if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
# the number of connections to allow in connection pool “overflow”
'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
# the number of connections to keep open inside the connection pool
'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
# this setting causes the pool to recycle connections after the given number of seconds has passed
'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
# number of seconds to wait before giving up on getting a connection from the pool
'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
}
)
# @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
self.read_session = sessionmaker(
bind=self.read_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=False,
autocommit=False
)
self.write_engine = create_async_engine(
self.app.config.get('DATABASE_WRITE_URL'),
connect_args={
'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
},
**{
'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
'poolclass': NullPool, # will be used to create a connection pool instance using the connection parameters given in the URL
}
)
self.write_session = sessionmaker(
bind=self.write_engine,
expire_on_commit=False,
class_=AsyncSession,
autoflush=True
)
async def __aenter__(self):
session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
session_ctx.set_token(self.context.set(session_ctx))
return session_ctx
async def __aexit__(self, exc_type, exc_value, traceback):
session_ctx = self.context.get()
try:
await session_ctx.close(exc_type, exc_value, traceback)
except Exception:
pass
self.context.reset(session_ctx.token)
@property
def read(self) -> Session:
return self.context.get().read
@property
def write(self) -> Session:
return self.context.get().write
@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
'''Listener for Pool checkout events that pings every connection before using.
Implements pessimistic disconnect handling strategy. See also:
http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''
cursor = dbapi_con.cursor()
try:
cursor.execute("SELECT 1")
except exc.OperationalError as ex:
if ex.args[0] in (2006, # MySQL server has gone away
2013, # Lost connection to MySQL server during query
2055): # Lost connection to MySQL server at '%s', system error: %d
raise exc.DisconnectionError() # caught by pool, which will retry with a new connection
else:
raise
cursor.close()
db = DBSession()
此配置允许我运行类似于以下内容的代码:
from models import User
from database import db
@app.get('/user')
async def get_user(request):
async with db:
users = User.find_all() # Special function in the Model that returns all users
return json({'items': [{'id': x.id for x in users}])
当代码退出async with
时,来自DBSession类的__aenter__
和大多数__aexit__
(以及后续的DBSessionContext
)处理所有事情,包括任何异常(如果发生)。
我遇到的问题是,有时,我会在Sentry报告以下错误:
垃圾收集器正在尝试清除连接〈AdaptedConnection〈asyncmy.connection.Connection object at 0x7f290c50dd30〉〉。缺少“终止”功能的异步dbapi上不支持此功能,因为在此阶段无法执行IO来重置连接。请关闭不再使用的所有连接,调用close()
或使用上下文管理器来管理它们的生存期。
我不明白为什么会发生这种情况。更奇怪的是,我经常在一个根本不使用数据库的函数调用上得到这种错误(async with db
仍然存在,但内部根本不使用数据库)。
该函数的内容是网络调用:
import requests
@app.get('/notify')
async def get_user(request):
async with db:
requests.post('https://service.com/notify', data={'some': 'data'})
return text('ok')
以下是我的假设,但我希望对这个问题有一个更清晰的看法:
- 假设1:由于读取操作使用的是QueuePool,因此可能对
close
的__aexit__
调用并没有真正关闭连接,因此,连接保持打开状态,从而导致稍后出现“垃圾收集器正在尝试清理连接”问题。 - 假设二:该连接在
check_connection
上建立,并保持打开状态,从而导致“垃圾收集器”问题
你知道我为什么会有“垃圾收集器”的问题吗?
我正在使用:
- 卫生==22.9.0
- SQL炼金术[异步]==1.4.41
- 异步==0.2.5
3条答案
按热度按时间vqlkdk9b1#
这一行可能会导致
await session_ctx.close(exc_type, exc_value, traceback)
问题。请尝试将其更改为
await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))
。这是在7月份添加到
SQLAlchemy
代码库中的。此更改已在
/asyncio/engine.py
和/asyncio/session.py
中实现。以下是代码中的更改:其他参考文献:
SQLAlchemy issue 8145
该变更已添加至版本1.4.40,发布日期为2022年8月8日
mdfafbf12#
一个简单而快速的解决方案可能是通过将其 Package 在try/except块中并通过打印输出来处理特定的错误来检查它。
w46czmvw3#
您不管理
requests.post
的生存期,这不是阻止调用close吗?虽然我确实认为aexit应该关闭会话,但我真的不明白您为什么要这样做:
async with db:
。会话的目的是什么?总体来说,实现得不错。