gpt4 book ai didi

python - 在python多进程之间共享一个postgres连接池

转载 作者:行者123 更新时间:2023-12-03 14:55:46 27 4
gpt4 key购买 nike

我正在尝试将 psycopg2 的连接池与 python 的多进程库一起使用。

目前,尝试以上述方式在线程之间共享连接池会导致:

psycopg2.OperationalError: SSL error: decryption failed or bad record mac

下面的代码应该重现错误,警告读者必须设置一个简单的 postgres 数据库。


from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras


connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres', user='postgres',password='postgres', host='localhost')

class ConnectionFromPool:
"""
Class to establish a connection with the local PostgreSQL database
To use:
query = SELECT * FROM ticker_metadata
with ConnectionFromPool() as cursor:
cursor.execute(query)
results = cursor.fetchall()
Returns:
Arrayed Dictionary of results
[{...},{...},{...}]
"""
def __init__(self):
self.connection_pool = None
self.cursor = None
self.connection = None
def __enter__(self):
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.connection.rollback()
else:
self.cursor.close()
self.connection.commit()
connection_pool.putconn(self.connection)


def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result


def multiprocessing(func, args, n_workers=2):
"""spawns multiple processes

Args:
func: function, to be performed
args: list of args to be passed to each call of func
n_workers: number of processes to be spawned

Return:
A list, containing the results of each proccess
"""
with Pool(processes=n_workers) as executor:
res = executor.starmap(func, args)

return list(res)


def main():
args = [[i] for i in range(1000)]
results = multiprocessing(test_query, args, 2)


if __name__ == "__main__":
main()

我已经尝试过的:
  • 让每个进程打开和关闭自己的数据库连接,而不是尝试使用连接池。这很慢。
  • 让每个进程使用自己的连接池,这也很慢。
  • 将一个 psycopg2 连接对象传递给每个进程,而不是使用 with 隐式调用它sql 查询中的语句。这会引发一个错误,声称连接对象不可pickle。

  • 注意:如果我放一个 sleep在除一个进程之外的所有进程中运行,非休眠进程运行良好并执行其查询,直到其余线程未休眠,然后我收到上述错误。

    我已经读过的:
  • Share connection to postgres db across processes in Python
  • Python: decryption failed or bad record mac when calling from Thread
  • Connection problems with SQLAlchemy and multiple processes

  • 最后:

    如何将连接池 (psycopg2) 与 python 的多进程(多处理)一起使用。我愿意使用其他库,只要它们与 python 和 postgresql 数据库一起工作。

    最佳答案

    这是我的解决方案。解决方案可以分为两部分:

  • 具有将由每个唯一进程执行的包装函数。这个包装函数的主要目的是创建自己的连接池
  • 对于步骤 1 中包装函数执行的每个查询,将连接池传递给查询函数(在上面的示例中,这是 test_query)

  • 更详细地引用问题中的示例:
    第1步
    创建包装函数,每个进程将重用一个连接池:
    def multi_query(list_of_cols):
    # create a new connection pool per Process
    new_pool = new_connection_pool()

    # Pass the pool to each query
    for col in list_of_cols:
    test_query(col, new_pool)
    第2步
    修改查询函数接受一个连接池:
    test_query :
    def test_query(col_attribute):
    """
    Simple SQL query
    """
    query = f"""SELECT *
    FROM col
    WHERE col = {col_attribute}
    ;"""
    with ConnectionFromPool() as cursor:
    cursor.execute(query)
    result = cursor.fetchall()
    return result
    新品 test_query :
    def test_query(col_attribute, connection_pool=None):
    """
    Simple SQL query
    """
    query = f"""SELECT *
    FROM col
    WHERE col = {col_attribute}
    ;"""
    with ConnectionFromPool(connection_pool) as cursor:
    cursor.execute(query)
    result = cursor.fetchall()
    return result

    关于python - 在python多进程之间共享一个postgres连接池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57214664/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com