gpt4 book ai didi

python - 多处理另一个函数的函数

转载 作者:太空宇宙 更新时间:2023-11-03 11:39:16 24 4
gpt4 key购买 nike

我正在对模拟的时间序列进行分析。基本上,它在每个时间步执行相同的任务。由于时间步数非常多,而且每个时间步的分析都是独立的,所以我想创建一个可以多处理另一个函数的函数。后者将有参数,并返回结果。

使用共享字典和 lib concurrent.futures,我设法写了这个:

import concurrent.futures as Cfut
def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# function : function that is running in parallel
# param_list : list of items
# group_size : size of the groups
# Nworkers : number of group/items running in the same time
# **param_fixed : passing parameters

manager = mlp.Manager()
dic = manager.dict()
executor = Cfut.ProcessPoolExecutor(Nworkers)

futures = [executor.submit(function, param, dic, *args)
for param in grouper(param_list, group_size)]

Cfut.wait(futures)
return [dic[i] for i in sorted(dic.keys())]

通常,我可以这样使用它:

def read_file(files, dictionnary):
for file in files:
i = int(file[4:9])
#print(str(i))
if 'bz2' in file:
os.system('bunzip2 ' + file)
file = file[:-4]
dictionnary[i] = np.loadtxt(file)
os.system('bzip2 ' + file)

Map = np.array(multiprocess_loop_grouped(read_file, list_alti, Group_size, N_thread))

或者像这样:

def autocorr(x):
result = np.correlate(x, x, mode='full')
return result[result.size//2:]

def find_lambda_finger(indexes, dic, Deviation):
for i in indexes :
#print(str(i))
# Beach = Deviation[i,:] - np.mean(Deviation[i,:])
dic[i] = Anls.find_first_max(autocorr(Deviation[i,:]), valmax = True)

args = [Deviation]
Temp = Rescal.multiprocess_loop_grouped(find_lambda_finger, range(Nalti), Group_size, N_thread, *args)

基本上,它是有效的。但效果不佳。有时它会崩溃。有时它实际上启动了与 Nworkers 数量相等的 python 进程,有时当我指定 Nworkers = 15 时,一次只有 2 或 3 个进程在运行。

例如,我提出的以下主题中描述了我遇到的一个经典错误:Calling matplotlib AFTER multiprocessing sometimes results in error : main thread not in main loop

什么是更 Pythonic 的方式来实现我想要的?我怎样才能改进控制这个功能?如何控制更多正在运行的 python 进程的数量?

最佳答案

Python 多处理的基本概念之一是使用队列。当您有一个可以迭代且不需要由子流程更改的输入列表时,它工作得很好。它还可以让您很好地控制所有进程,因为您可以生成所需的数量,您可以让它们闲置或停止。

它也更容易调试。显式共享数据通常是一种更难正确设置的方法。

队列可以容纳任何东西,因为根据定义它们是可迭代的。因此,您可以在其中填充用于读取文件的文件路径字符串、用于计算的不可迭代数字,甚至用于绘图的图像。

在您的情况下,布局可能如下所示:

import multiprocessing as mp
import numpy as np
import itertools as it


def worker1(in_queue, out_queue):
#holds when nothing is available, stops when 'STOP' is seen
for a in iter(in_queue.get, 'STOP'):
#do something
out_queue.put({a: result}) #return your result linked to the input

def worker2(in_queue, out_queue):
for a in iter(in_queue.get, 'STOP'):
#do something differently
out_queue.put({a: result}) //return your result linked to the input

def multiprocess_loop_grouped(function, param_list, group_size, Nworkers, *args):
# your final result
result = {}

in_queue = mp.Queue()
out_queue = mp.Queue()

# fill your input
for a in param_list:
in_queue.put(a)
# stop command at end of input
for n in range(Nworkers):
in_queue.put('STOP')

# setup your worker process doing task as specified
process = [mp.Process(target=function,
args=(in_queue, out_queue), daemon=True) for x in range(Nworkers)]

# run processes
for p in process:
p.start()

# wait for processes to finish
for p in process:
p.join()

# collect your results from the calculations
for a in param_list:
result.update(out_queue.get())

return result

temp = multiprocess_loop_grouped(worker1, param_list, group_size, Nworkers, *args)
map = multiprocess_loop_grouped(worker2, param_list, group_size, Nworkers, *args)

当您担心您的队列会耗尽内存时,它可以变得更加动态。比您需要在进程运行时填充和清空队列。看这个例子here .

结语:它并不像您要求的那样更加 Pythonic。但对于新手来说更容易理解 ;-)

关于python - 多处理另一个函数的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53042797/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com