postgresql Python Postgres psycopg2线程连接池已耗尽

cotxawn7  于 2023-02-08  发布在  PostgreSQL
关注(0)|答案(4)|浏览(361)

我已经研究了几个“太多的客户”相关的主题,但仍然不能解决我的问题,所以我不得不再次问这个问题,为我的具体情况。
基本上,我设置了我的本地Postgres服务器,需要执行成千上万的查询,所以我使用了Python psycopg2package。

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
    conn = tcp.getconn()
    c = conn.cursor()

    q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"   

    c.execute(q)
    all_results = c.fetchall()
    for row in all_results:
        return row
    tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

    cnt+=1
    with ThreadPoolExecutor(max_workers=1) as pool:
        ret = pool.submit(do_one_query,  row["S"], row["T"])
        print ret.result()
    print cnt

这段代码在df很小的情况下运行得很好。如果我重复df 10000次,我会得到一个错误消息,说连接池耗尽。我以为我使用的连接已经被这行代码关闭了:
tcp.putconn(conn,close=True)但我猜它们实际上并没有关闭?我该如何解决这个问题?

p1tboqfb

p1tboqfb1#

我很难找到关于ThreadedConnectionPool如何工作的真正详细的信息。https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html并不坏,但事实证明它声称getconn阻塞直到连接可用是不正确的。检查代码,所有ThreadedConnectionPool添加的都是围绕AbstractConnectionPool方法的锁,以防止出现争用情况。如果在任何时候尝试使用超过maxconn的连接,将引发 * 连接池耗尽 * PoolError。
如果您想要比the accepted answer简单一些的方法,那么进一步将方法 Package 在一个信号量中,提供阻塞,直到连接可用,这样就可以达到目的:

from threading import Semaphore
from psycopg2.pool import ThreadedConnectionPool
class ReallyThreadedConnectionPool(ThreadedConnectionPool):
    def __init__(self, minconn, maxconn, *args, **kwargs):
        self._semaphore = Semaphore(maxconn)
        super().__init__(minconn, maxconn, *args, **kwargs)

    def getconn(self, *args, **kwargs):
        self._semaphore.acquire()
        try:
            conn = super().getconn(*args, **kwargs)
        except:
            self._semaphore.release()
            raise
        return SemaphoreManagedConnection(self, conn)

class SemaphoreManagedConnection:
    def __init__(self, pool, conn):
        self.pool = pool
        self.conn = conn

    def cursor(self, *args, **kwargs):
        return self.conn.cursor(*args, **kwargs)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        try:
            self.conn.close()
        finally:
            try:
                self.pool.putconn(self.conn)
            finally:
                self.pool._semaphore.release()
pxyaymoc

pxyaymoc2#

您需要在池的顶部使用队列。
类似下面这样的方法应该有效:

import gevent, sys, random, psycopg2, logging
from contextlib import contextmanager
from gevent.queue import Queue
from gevent.socket import wait_read, wait_write
from psycopg2.pool import ThreadedConnectionPool
from psycopg2 import extensions, OperationalError
import sys
logger = logging.getLogger(__name__)

poolsize = 100  #number of max connections
pdsn = '' # put your dsn here

if sys.version_info[0] >= 3:
    integer_types = (int,)
else:
    import __builtin__
    integer_types = (int, __builtin__.long)

   
class ConnectorError(Exception):
    """ This is a base class for all CONNECTOR related exceptions """
    pass

#simplified calls etc. db.fetchall(SQL, arg1, arg2...)
def cursor(): return Pcursor()
def fetchone(PSQL, *args): return Pcursor().fetchone(PSQL, *args)
def fetchall(PSQL, *args): return Pcursor().fetchall(PSQL, *args)
def execute(PSQL, *args): return Pcursor().execute(PSQL, *args)

#singleton connection pool, gets reset if a connection is bad or drops
_pgpool = None
def pgpool():
    global _pgpool
    if not _pgpool:
        try:
            _pgpool = PostgresConnectionPool(maxsize=poolsize)
        except psycopg2.OperationalError as exc:
            _pgpool = None
    return _pgpool

class Pcursor(object):

    def __init__(self, **kwargs):
        #in case of a lost connection lets sit and wait till it's online
        global _pgpool
        if not _pgpool:
            while not _pgpool:
                try:
                    pgpool()
                except:
                    logger.debug('Attempting Connection To Postgres...')
                    gevent.sleep(1)

    def fetchone(self, PSQL, *args):
        with _pgpool.cursor() as cursor:
            try:
                cursor.execute(PSQL, args)
            except TypeError:
                cursor.execute(PSQL, args[0])
            except Exception as exc:
                print(sys._getframe().f_back.f_code)
                print(sys._getframe().f_back.f_code.co_name)
                logger.warning(str(exc))
            logger.debug(cursor.query)
            return cursor.fetchone()

    def fetchall(self, PSQL, *args):
        with _pgpool.cursor() as cursor:
            try:
                cursor.execute(PSQL, args)
            except TypeError:
                cursor.execute(PSQL, args[0])
            except Exception as exc:
                print(sys._getframe().f_back.f_code)
                print(sys._getframe().f_back.f_code.co_name)
                logger.warning(str(exc))
            logger.debug(cursor.query)
            return cursor.fetchall()

    def execute(self, PSQL, *args):
        with _pgpool.cursor() as cursor:
            try:
                cursor.execute(PSQL, args)
            except TypeError:
                cursor.execute(PSQL, args[0])
            except Exception as exc:
                print(sys._getframe().f_back.f_code)
                print(sys._getframe().f_back.f_code.co_name)
                logger.warning(str(exc))
            finally:
                logger.debug(cursor.query)
                return cursor.query

    def fetchmany(self, PSQL, *args):
        with _pgpool.cursor() as cursor:
            try:
                cursor.execute(PSQL, args)
            except TypeError:
                cursor.execute(PSQL, args[0])
            while 1:
                items = cursor.fetchmany()
                if not items:
                    break
                for item in items:
                    yield item

class AbstractDatabaseConnectionPool(object):

    def __init__(self, maxsize=poolsize):
        if not isinstance(maxsize, integer_types):
            raise TypeError('Expected integer, got %r' % (maxsize, ))
        self.maxsize = maxsize
        self.pool = Queue()
        self.size = 0

    def create_connection(self):
        #overridden by PostgresConnectionPool
        raise NotImplementedError()

    def get(self):
        pool = self.pool
        if self.size >= self.maxsize or pool.qsize():
            return pool.get()

        self.size += 1
        try:
            new_item = self.create_connection()
        except:
            self.size -= 1
            raise
        return new_item

    def put(self, item):
        self.pool.put(item)

    def closeall(self):
        while not self.pool.empty():
            conn = self.pool.get_nowait()
            try:
                conn.close()
            except Exception:
                pass

    @contextmanager
    def connection(self, isolation_level=None):
        conn = self.get()
        try:
            if isolation_level is not None:
                if conn.isolation_level == isolation_level:
                    isolation_level = None
                else:
                    conn.set_isolation_level(isolation_level)
            yield conn
        except:
            if conn.closed:
                conn = None
                self.closeall()
            raise
        else:
            if conn.closed:
                raise OperationalError("Cannot commit because connection was closed: %r" % (conn, ))
        finally:
            if conn is not None and not conn.closed:
                if isolation_level is not None:
                    conn.set_isolation_level(isolation_level)
                self.put(conn)

    @contextmanager
    def cursor(self, *args, **kwargs):
        isolation_level = kwargs.pop('isolation_level', None)
        with self.connection(isolation_level) as conn:
            try:
                yield conn.cursor(*args, **kwargs)
            except:
                global _pgpool
                _pgpool = None
                del(self)

class PostgresConnectionPool(AbstractDatabaseConnectionPool):
    def __init__(self,**kwargs):
        try:
            self.pconnect = ThreadedConnectionPool(1, poolsize, dsn=pdsn)
        except:
            global _pgpool
            _pgpool = None
            raise ConnectorError('Database Connection Failed')
        maxsize = kwargs.pop('maxsize', None)
        self.kwargs = kwargs
        AbstractDatabaseConnectionPool.__init__(self, maxsize)

    def create_connection(self):
        self.conn = self.pconnect.getconn()
        self.conn.autocommit = True
        return self.conn

def gevent_wait_callback(conn, timeout=None):
    """A wait callback useful to allow gevent to work with Psycopg."""
    while 1:
        state = conn.poll()
        if state == extensions.POLL_OK:
            break
        elif state == extensions.POLL_READ:
            wait_read(conn.fileno(), timeout=timeout)
        elif state == extensions.POLL_WRITE:
            wait_write(conn.fileno(), timeout=timeout)
        else:
            raise ConnectorError("Bad result from poll: %r" % state)

extensions.set_wait_callback(gevent_wait_callback)

然后您可以通过以下方式呼叫您的连接:

import db
db.Pcursor().execute(PSQL, arg1, arg2, arg3)

基本上,我借用了异步postgres的gevent示例,并对其进行了修改,以通过pyscopg2支持线程池。
https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py
我在模块中添加了psycogreen的功能,因此您所需要做的就是导入并调用该类。每次调用该类都会在队列中堆叠一个新查询,但只使用特定大小的池。这样您就不会耗尽连接。这本质上类似于PGBouncer的功能,我认为这也会消除您的问题。
https://pgbouncer.github.io/

jyztefdp

jyztefdp3#

这里的问题是,您实际上没有将连接返回到池,而是使用

tcp.putconn(conn, close=True)

请参阅此处的文档http://initd.org/psycopg/docs/pool.html

If close is True, discard the connection from the pool.

因此,如果您将800个连接放入池中,在801个循环之后,您将得到“exhausted error”(耗尽错误),因为您的连接池大小为零。

fnx2tebb

fnx2tebb4#

我认为您得到PoolError(“exhausted connections”)的原因可能是当all_results不是None时,您在关闭连接之前返回。因此,连接池已耗尽

def do_one_query(inputS, inputT):
    ...
    for row in all_results: 
        return row   <----  return row before putconn when all_results is not None, 
    tcp.putconn(conn, close=True)

for idx, row in df.iterrows():

    cnt+=1
    with ThreadPoolExecutor(max_workers=1) as pool:
        ret = pool.submit(do_one_query,  row["S"], row["T"])
        print ret.result()
    print cnt

我做了一个丑陋的实现,当耗尽或连接丢失时,尝试重新连接以获得新的连接,如下所示

class PostgresConnectionPool:
    def __init__(self, minconn, maxconn, *args, **kwargs):
        self.pool = ThreadedConnectionPool(minconn=minconn, maxconn=maxconn, *args, **kwargs)

    def get_conn(self):
        try:
            # check if connection lost or pool exhausted
            con = self.pool.getconn()
            cur = con.cursor()
            cur.execute("select 1;")
        except (OperationalError, PoolError) as oe:
            print(f"get pg connection with err:{oe}, reconnect")
            # reconnect
            key = str(uuid.uuid4())
            con = self.pool._connect(key)
        return con

相关问题