gpt4 book ai didi

python - 如何使用joblib并行写入文件?可加入队列问题

转载 作者:行者123 更新时间:2023-12-04 12:59:16 27 4
gpt4 key购买 nike

我正在尝试将运行超过 10 万个文件的计算结果写入单个文件。处理文件需要大约 1 秒,并将一行写入输出文件。问题本身是“令人尴尬的并行”,我只是在努力正确写入文件(例如 CSV)。这是很久以前对我有用的东西(Python 3.4?):

import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + '\n')
q.task_done()

def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
q = JoinableQueue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()

今天(在 Python 3.6+ 上)它产生以下异常:
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""

如何使用joblib正确写入单个文件?

最佳答案

原来实现任务的一种方法是通过 multiprocessing.Manager , 像这样:

import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed

def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + '\n')

def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
m = Manager()
q = m.Queue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()

我们让 Manager管理上下文,其余保持不变(没有使用正常的 Queue 代替 JoinableQueue )。

如果有人知道更好/更清洁的方式,我会很乐意接受它作为答案。

关于python - 如何使用joblib并行写入文件?可加入队列问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61025918/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com