- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我按照以下代码在 postgres 数据库上实现并行选择查询:
https://tech.geoblink.com/2017/07/06/parallelizing-queries-in-postgresql-with-python/
我的基本问题是我有大约 6k 个查询需要执行,我正在尝试优化这些选择查询的执行。最初它是一个包含所有 6k 谓词 ID 的 where id in (...)
查询,但我遇到了问题,查询在它运行的机器上耗尽了 > 4GB 的 RAM,所以我决定将其拆分为 6k 个单独的查询,这些查询在同步时保持稳定的内存使用。然而,明智地运行时间需要更长的时间,这对我的用例来说不是问题。尽管如此,我还是尽量减少时间。
我的代码是这样的:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.engine = self.init_connection()
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS)
def init_connection(self):
LOGGER.info('Creating Postgres engine')
return create_engine(self.db_url)
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
self.pool.close()
self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
con = psycopg2.connect(self.db_url)
cur = con.cursor()
cur.execute(query)
records = cur.fetchall()
con.close()
return list(records)
但是无论何时运行,我都会收到以下错误:
TypeError: can't pickle _thread.RLock objects
我读过很多关于使用多处理和可腌制对象的类似问题,但我终究无法弄清楚我做错了什么。
池通常是每个进程一个(我认为这是最佳实践),但每个连接器类的实例共享,因此它不会为每次使用 parallel_query 方法创建一个池。
类似问题的最佳答案:
Accessing a MySQL connection pool from Python multiprocessing
除了使用 MySql 而不是 Postgres 之外,显示了与我自己的几乎相同的实现。
我做错了什么吗?
谢谢!
编辑:
我找到了这个答案:
Python Postgres psycopg2 ThreadedConnectionPool exhausted
这非常详细,看起来我误解了 multiprocessing.Pool
与连接池(例如 ThreadedConnectionPool
)给我的含义。然而,在第一个链接中,它没有提到需要任何连接池等。这个解决方案看起来不错,但对于我认为是一个相当简单的问题,似乎有很多代码?
编辑 2:
所以上面的链接解决了另一个问题,我很可能会遇到这个问题,所以我很高兴我发现了这个问题,但它并没有解决最初无法使用 imap_unordered
的问题到酸洗错误。非常令人沮丧。
最后,我认为可能值得注意的是,它在 Heroku 中运行,在 worker dyno 上,使用 Redis rq 进行调度、后台任务等,并使用 Postgres 的托管实例作为数据库。
最佳答案
简单来说,postgres连接和sqlalchemy连接池是线程安全的,但是它们不是fork安全的。
如果你想使用多进程,你应该在fork之后的每个子进程中初始化引擎。
如果你想共享引擎,你应该使用多线程。
引用Thread and process safety in psycopg2 documentation :
libpq connections shouldn’t be used by a forked processes, so when using a module such as multiprocessing or a forking web deploy method such as FastCGI make sure to create the connections after the fork.
如果您正在使用 multiprocessing.Pool,则有一个关键字参数初始化器,可用于在每个子进程上运行一次代码。试试这个:
class PostgresConnector(object):
def __init__(self, db_url):
self.db_url = db_url
self.pool = self.init_pool()
def init_pool(self):
CPUS = multiprocessing.cpu_count()
return multiprocessing.Pool(CPUS, initializer=self.init_connection(self.db_url))
@classmethod
def init_connection(cls, db_url):
def _init_connection():
LOGGER.info('Creating Postgres engine')
cls.engine = create_engine(db_url)
return _init_connection
def run_parallel_queries(self, queries):
results = []
try:
for i in self.pool.imap_unordered(self.execute_parallel_query, queries):
results.append(i)
except Exception as exception:
LOGGER.error('Error whilst executing %s queries in parallel: %s', len(queries), exception)
raise
finally:
pass
#self.pool.close()
#self.pool.join()
LOGGER.info('Parallel query ran producing %s sets of results of type: %s', len(results), type(results))
return list(chain.from_iterable(results))
def execute_parallel_query(self, query):
with self.engine.connect() as conn:
with conn.begin():
result = conn.execute(query)
return result.fetchall()
def __getstate__(self):
# this is a hack, if you want to remove this method, you should
# remove self.pool and just pass pool explicitly
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
现在,解决 XY 问题。
Initially it was a single query with the where id in (...) contained all 6k predicate IDs but I ran into issues with the query using up > 4GB of RAM on the machine it ran on, so I decided to split it out into 6k individual queries which when synchronously keeps a steady memory usage.
您可能想要做的是以下选项之一:
但是,如果您坚持通过 python 运行 6000 个 ID,那么最快的查询很可能既不会一次性完成所有 6000 个 ID(这会耗尽内存),也不会运行 6000 个单独的查询。相反,您可能想尝试对查询进行分 block 。例如一次发送 500 个 ID。您将不得不试验 block 大小,以确定您一次可以发送的最大 ID 数,同时仍然在您的内存预算范围内。
关于python - 多处理/psycopg2 TypeError : can't pickle _thread. RLock 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52537741/
我在 https://code.google.com/p/pyloadtools/wiki/CodeTutorialMultiThreading 找到了这个简单的代码 import _thread d
我尝试从终端启动 webapp2 开发服务器,但收到此错误 > File "/Users/mertbarutcuoglu/Desktop/hellowebapp2/main.py", line 12,
从 ubuntu 10.04 开始,我使用 easy_install 安装了 pylab。升级后,我可以导入 pylab。首先,我运行从 easy_install 安装的 ipython: $ ipy
我正在尝试解析包含汽车属性(154 种属性)的网站。我有一个巨大的列表(名称是 liste_test),其中包含 280.000 个二手车公告 URL。 def araba_cekici(liste_
在此处检查 Stackoverflow 上的所有现有答案后:Checkpointing keras model: TypeError: can't pickle _thread.lock object
这是我的代码 class MusicHandler(object): """ Implements the logic to download musics """ def __ini
致所有尝试开始使用 docker-compose 的好伙伴。我正在运行 OS X El Capitan (10.11)。 系统附带 python 2.7。不建议尝试将系统 python 替换为 pyt
我读过《Core Python Applications Planning》,其中编写了这段代码。 import _thread from time import sleep, ctime loops
阅读了一些类似的问题,其中大多数提到你不应该尝试序列化一个不可序列化的对象。我无法理解这个问题。我可以将模型保存为 .h5 文件,但这并不能达到我想要做的目的。请帮忙! def image_g
我正在使用带有 flask 的 RQ 来循环排队作业。我有以下代码: from rq import Queue from rq.job import Job from worker import co
尝试使用共享队列同时运行两个不同的函数并出现错误...如何使用共享队列同时运行两个函数?这是 Windows 7 上的 Python 3.6 版。 from multiprocessing impor
from keras.layers import Embedding, Dense, Input, Dropout, Reshape from keras.layers.convolutional i
我尝试使用 Cassandra 和 multiprocessing 根据中的示例同时插入行(虚拟数据) http://www.datastax.com/dev/blog/datastax-python
我在 np.save 上收到此错误。请让我知道原因以及如何解决这个问题。下面是我的代码: import cv2 import numpy as np import os from random imp
我正在使用 Keras 创建 ANN 并在网络上进行网格搜索。运行以下代码时遇到以下错误: model = KerasClassifier(build_fn=create_model(input_di
我正在使用来自(事实上的标准)的 Redis 客户端 python 实现:https://pypi.org/project/redis/ 所以我在后台定义了多个worker,每个worker都有一个在
过去似乎在不同的上下文中发生了错误 here ,但我没有直接转储模型——我正在使用 ModelCheckpoint 回调。知道可能出了什么问题吗? 信息: Keras 2.0.8 版 Tensorfl
我遇到了 pydantic.BaseSettings 的问题和 prometheus_client.Summary . 下面的代码片段在尝试执行时抛出异常: from prometheus_clien
在运行 Python 3.6 的 Windows 10 系统上,尝试使用 multiprocessing.Process 时创建一个新的rq worker , multiprocessing.Proc
我使用的是python 3.6 我正在尝试从下面显示的名称 SubmitJobsUsingMultiProcessing() 的类方法内部使用多重处理,该方法进一步依次调用另一个类方法。 我不断遇到此
我是一名优秀的程序员,十分优秀!