gpt4 book ai didi

Python Postgres psycopg2 ThreadedConnectionPool 耗尽

转载 作者:行者123 更新时间:2023-11-29 11:30:24 29 4
gpt4 key购买 nike

我在这里查看了几个与“太多客户”相关的主题,但仍然无法解决我的问题,所以我必须针对我的具体情况再次询问。

基本上,我设置了本地 Postgres 服务器并需要进行数万次查询,所以我使用了 Python psycopg2 包。这是我的代码:

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
conn = tcp.getconn()
c = conn.cursor()

q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"

c.execute(q)
all_results = c.fetchall()
for row in all_results:
return row
tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

cnt+=1
with ThreadPoolExecutor(max_workers=1) as pool:
ret = pool.submit(do_one_query, row["S"], row["T"])
print ret.result()
print cnt

代码运行良好,df 很小。如果我重复 df 10000 次,我会收到错误消息,指出连接池耗尽.虽然我使用的连接已被这条线关闭:

tcp.putconn(conn, close=True)但我想实际上他们并没有关闭?我该如何解决这个问题?

最佳答案

我一直在努力寻找有关 ThreadedConnectionPool 如何工作的真正详细信息。 https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html不错,但事实证明它声称 getconn 阻塞直到连接可用是不正确的。检查代码,所有 ThreadedConnectionPool 添加的是围绕 AbstractConnectionPool 方法的锁,以防止竞争条件。如果在任何时候尝试使用超过 maxconn 个连接,连接池耗尽将引发 PoolError。

如果你想要比 the accepted answer 更简单的东西,进一步将方法包装在信号量中,提供阻塞直到连接可用应该可以解决问题:

from psycopg2.pool import ThreadedConnectionPool as _ThreadedConnectionPool
from threading import Semaphore

class ThreadedConnectionPool(_ThreadedConnectionPool):
def __init__(self, minconn, maxconn, *args, **kwargs):
self._semaphore = Semaphore(maxconn)
super().__init__(minconn, maxconn, *args, **kwargs)

def getconn(self, *args, **kwargs):
self._semaphore.acquire()
try:
return super().getconn(*args, **kwargs)
except:
self._semaphore.release()
raise

def putconn(self, *args, **kwargs):
try:
super().putconn(*args, **kwargs)
finally:
self._semaphore.release()

# closeall is inherited as is. This means the Semaphore does
# not get reset, but neither do the core structures for
# maintaining the pool in the original ThreadedConnectionPool
# so a closed pool is not intended to be reused once closed.

请注意,ConnectionPools,无论是标准的还是线程的,都只带有 putconn、getconn 和 closeall 这三个方法,没有像上下文管理这样的奇特之处。因此,以上内容应涵盖所有现有功能。

关于Python Postgres psycopg2 ThreadedConnectionPool 耗尽,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48532301/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com