gpt4 book ai didi

Python 多处理与 unpicklable 对象

转载 作者:太空宇宙 更新时间:2023-11-04 06:19:13 27 4
gpt4 key购买 nike

目标:

  • 使用带有线程或进程的 SQLAlchemy 在数据库中运行约 40 个巨大的查询,将相应的 SQLA ResultProxies在 Queue.Queue 中(由 multiprocessing.Manager 处理)
  • 同时,将结果写入.csv文件,其中包含多个消耗所述队列的进程

当前状态:

  • 运行查询和写入数据的 QueryThread 和 WriteThread 类;由于查询需要一些时间才能运行,因此 GIL 处理线程的方式不会造成明显的性能损失
  • 另一方面,写文件需要很长时间;事实上,尽管最初的想法是运行 WriteThread 类的多个线程,但使用单个线程可以获得最佳性能。

因此有了使用多处理的想法;我希望能够同时写入输出,而不是 CPU 绑定(bind)而是 I/O 绑定(bind)。

撇开背景不谈,这里是问题(本质上是一个设计问题)- multiprocessing library通过 pickle 对象然后将数据管道传输到其他生成的进程来工作;但是我尝试在 WriteWorker Process 中使用的 ResultProxy 对象和共享队列不可挑选,这导致以下消息(不是逐字记录,但足够接近):

pickle.PicklingError: Can't pickle object in WriteWorker.start()

所以对你们有用的人的问题是,关于可以避免此问题的潜在设计模式或方法的任何想法?这似乎是一个简单、经典的生产者-消费者问题,我想出了解决方案很简单,我只是想多了

感谢任何帮助或反馈!谢谢:)

编辑:这里有一些相关的代码片段,如果我可以提供任何其他上下文,请告诉我

来自父类:

#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()


def _get_data(self):
#spawn a pool of query processes, and pass them query queue instance
for i in xrange(self.NUM_QUERY_THREADS):
qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
qt.daemon = True
# qt.setDaemon(True)
qt.start()

#populate query queue
self.parse_sql_queries()

#spawn a pool of writer processes, and pass them output queue instance
for i in range(self.NUM_WRITE_THREADS):
wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
wt.daemon = True
# wt.setDaemon(True)
wt.start()

#wait on the queues until everything has been processed
self.query_queue.join()
self.write_queue.join()

来自 QueryWorker 类:

def run(self):
while True:
#grabs host from query queue
query_tupe = self.query_queue.get()
table = query_tupe[0]
query = query_tupe[1]
query_num = query_tupe[2]
if query and table:
#grab connection from pool, run the query
connection = self.engine.connect()
print 'Running query #' + str(query_num) + ': ' + table
try:
result = connection.execute(query)
except:
print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: ' + str(sys.exc_info()[1])

#place result handle tuple into out queue
self.out_queue.put((table, result))

#signals to queue job is done
self.query_queue.task_done()

最佳答案

简单的答案是避免直接使用 ResultsProxy。而是使用 cursor.fetchall() 或 cursor.fetchmany(number_to_fetch) 从 ResultsProxy 获取数据,然后将数据传递到多处理队列。

关于Python 多处理与 unpicklable 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13594934/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com