python 垃圾回收器正在尝试清除连接异步,

qaxu7uf2  于 2022-10-30  发布在  Python
关注(0)|答案(3)|浏览(138)

bounty将于明天到期。回答此问题可获得+200的声望奖励。Cyril N.正在寻找标准答案

我会尽可能完整地写这一期。
我正在使用Sanic,一个ASGI Python框架,并在此基础上构建了一个数据库管理器。
这个数据库管理器使用ContextVar在代码中的任何地方给予对当前db示例的访问。
下面是与数据库相关的代码:
database.py


# -*- coding:utf-8 -*-

from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool, NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sentry_sdk import push_scope, capture_exception
from sanic import Sanic

class EngineNotInitialisedError(Exception):
    pass

class DBSessionContext:
    def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
        self.read_session = read_session
        self.write_session = write_session
        self.commit_on_exit = commit_on_exit

        self.token = None
        self._read = None
        self._write = None

    def _disable_flush(self, *args,**kwargs):
        raise NotImplementedError('Unable to flush a read-only session.')

    async def close(self, exc_type=None, exc_value=None, traceback=None):
        if self._write:
            try:
                if exc_value and getattr(exc_value, 'status_code', 500) > 300:
                    await self._write.rollback()
                else:
                    await self._write.commit()
            except Exception as e:
                pass

            try:
                await self._write.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

        if self._read:
            try:
                await self._read.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

    def set_token(self, token):
        self.token = token

    @property
    def read(self) -> Session:
        if not self._read:
            self._read = self.read_session()
            self._read.flush = self._disable_flush

        return self._read

    @property
    def write(self) -> Session:
        if not self._write:
            self._write = self.write_session()

        return self._write

class AsyncSession(SQLAlchemyAsyncSession):
    async def execute(self, statement,**parameters):
        return await super().execute(statement, parameters)

    async def first(self, statement,**parameters):
        executed = await self.execute(statement,**parameters)
        return executed.first()

    async def all(self, statement,**parameters):
        executed = await self.execute(statement,**parameters)
        return executed.all()

class DBSession:
    def __init__(self):
        self.app = None
        self.read_engine = None
        self.read_session = None
        self.write_engine = None
        self.write_session = None
        self._session = None
        self.context = ContextVar("context", default=None)
        self.commit_on_exit = True

    def init_app(self, app: Surge) -> None:
        self.app = app
        self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)

        self.read_engine = create_async_engine(
            self.app.config.get('DATABASE_READ_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
          **{
                'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
                'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
                'poolclass': QueuePool,  # will be used to create a connection pool instance using the connection parameters given in the URL
                # if pool_class is not NullPool:

                # if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
                'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
                # the number of connections to allow in connection pool “overflow”
                'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
                # the number of connections to keep open inside the connection pool
                'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
                # this setting causes the pool to recycle connections after the given number of seconds has passed
                'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
                # number of seconds to wait before giving up on getting a connection from the pool
                'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
            }
        )

        # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
        self.read_session = sessionmaker(
            bind=self.read_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )

        self.write_engine = create_async_engine(
            self.app.config.get('DATABASE_WRITE_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
          **{
                'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
                'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
                'poolclass': NullPool,  # will be used to create a connection pool instance using the connection parameters given in the URL
            }
        )

        self.write_session = sessionmaker(
            bind=self.write_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=True
        )

    async def __aenter__(self):
        session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
        session_ctx.set_token(self.context.set(session_ctx))

        return session_ctx

    async def __aexit__(self, exc_type, exc_value, traceback):
        session_ctx = self.context.get()
        try:
            await session_ctx.close(exc_type, exc_value, traceback)
        except Exception:
            pass

        self.context.reset(session_ctx.token)

    @property
    def read(self) -> Session:
        return self.context.get().read

    @property
    def write(self) -> Session:
        return self.context.get().write

@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")
    except exc.OperationalError as ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection
        else:
            raise

    cursor.close()

db = DBSession()

此配置允许我运行类似于以下内容的代码:

from models import User
from database import db

@app.get('/user')
async def get_user(request):
    async with db:
        users = User.find_all()  # Special function in the Model that returns all users
        return json({'items': [{'id': x.id for x in users}])

当代码退出async with时,来自DBSession类的__aenter__和大多数__aexit__(以及后续的DBSessionContext)处理所有事情,包括任何异常(如果发生)。
我遇到的问题是,有时,我会在Sentry报告以下错误:
垃圾收集器正在尝试清除连接〈AdaptedConnection〈asyncmy.connection.Connection object at 0x7f290c50dd30〉〉。缺少“终止”功能的异步dbapi上不支持此功能,因为在此阶段无法执行IO来重置连接。请关闭不再使用的所有连接,调用close()或使用上下文管理器来管理它们的生存期。
我不明白为什么会发生这种情况。更奇怪的是,我经常在一个根本不使用数据库的函数调用上得到这种错误(async with db仍然存在,但内部根本不使用数据库)。
该函数的内容是网络调用:

import requests

@app.get('/notify')
async def get_user(request):
    async with db:
        requests.post('https://service.com/notify', data={'some': 'data'})

    return text('ok')

以下是我的假设,但我希望对这个问题有一个更清晰的看法:

  • 假设1:由于读取操作使用的是QueuePool,因此可能对close__aexit__调用并没有真正关闭连接,因此,连接保持打开状态,从而导致稍后出现“垃圾收集器正在尝试清理连接”问题。
  • 假设二:该连接在check_connection上建立,并保持打开状态,从而导致“垃圾收集器”问题

你知道我为什么会有“垃圾收集器”的问题吗?
我正在使用:

  • 卫生==22.9.0
  • SQL炼金术[异步]==1.4.41
  • 异步==0.2.5
vqlkdk9b

vqlkdk9b1#

这一行可能会导致await session_ctx.close(exc_type, exc_value, traceback)问题。
请尝试将其更改为await asyncio.shield(session_ctx.close(exc_type, exc_value, traceback))
这是在7月份添加到SQLAlchemy代码库中的。

此更改已在/asyncio/engine.py/asyncio/session.py中实现。以下是代码中的更改:

其他参考文献:
SQLAlchemy issue 8145
该变更已添加至版本1.4.40,发布日期为2022年8月8日

mdfafbf1

mdfafbf12#

一个简单而快速的解决方案可能是通过将其 Package 在try/except块中并通过打印输出来处理特定的错误来检查它。

w46czmvw

w46czmvw3#

您不管理requests.post的生存期,这不是阻止调用close吗?
虽然我确实认为aexit应该关闭会话,但我真的不明白您为什么要这样做:async with db:。会话的目的是什么?
总体来说,实现得不错。

相关问题