我已经研究了几个“太多的客户”相关的主题,但仍然不能解决我的问题,所以我不得不再次问这个问题,为我的具体情况。
基本上,我设置了我的本地Postgres服务器,需要执行成千上万的查询,所以我使用了Python psycopg2package。
import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor
df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times
DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)
def do_one_query(inputS, inputT):
conn = tcp.getconn()
c = conn.cursor()
q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"
c.execute(q)
all_results = c.fetchall()
for row in all_results:
return row
tcp.putconn(conn, close=True)
cnt=0
for idx, row in df.iterrows():
cnt+=1
with ThreadPoolExecutor(max_workers=1) as pool:
ret = pool.submit(do_one_query, row["S"], row["T"])
print ret.result()
print cnt
这段代码在df很小的情况下运行得很好。如果我重复df 10000次,我会得到一个错误消息,说连接池耗尽。我以为我使用的连接已经被这行代码关闭了:
tcp.putconn(conn,close=True)但我猜它们实际上并没有关闭?我该如何解决这个问题?
4条答案
按热度按时间p1tboqfb1#
我很难找到关于ThreadedConnectionPool如何工作的真正详细的信息。https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html并不坏,但事实证明它声称getconn阻塞直到连接可用是不正确的。检查代码,所有ThreadedConnectionPool添加的都是围绕AbstractConnectionPool方法的锁,以防止出现争用情况。如果在任何时候尝试使用超过maxconn的连接,将引发 * 连接池耗尽 * PoolError。
如果您想要比the accepted answer简单一些的方法,那么进一步将方法 Package 在一个信号量中,提供阻塞,直到连接可用,这样就可以达到目的:
pxyaymoc2#
您需要在池的顶部使用队列。
类似下面这样的方法应该有效:
然后您可以通过以下方式呼叫您的连接:
基本上,我借用了异步postgres的gevent示例,并对其进行了修改,以通过pyscopg2支持线程池。
https://github.com/gevent/gevent/blob/master/examples/psycopg2_pool.py
我在模块中添加了psycogreen的功能,因此您所需要做的就是导入并调用该类。每次调用该类都会在队列中堆叠一个新查询,但只使用特定大小的池。这样您就不会耗尽连接。这本质上类似于PGBouncer的功能,我认为这也会消除您的问题。
https://pgbouncer.github.io/
jyztefdp3#
这里的问题是,您实际上没有将连接返回到池,而是使用
请参阅此处的文档http://initd.org/psycopg/docs/pool.html
因此,如果您将800个连接放入池中,在801个循环之后,您将得到“exhausted error”(耗尽错误),因为您的连接池大小为零。
fnx2tebb4#
我认为您得到PoolError(“exhausted connections”)的原因可能是当all_results不是None时,您在关闭连接之前返回。因此,连接池已耗尽
我做了一个丑陋的实现,当耗尽或连接丢失时,尝试重新连接以获得新的连接,如下所示