gpt4 book ai didi

python - 多处理通过共享内存传递字典数组

转载 作者:太空狗 更新时间:2023-10-30 03:01:23 24 4
gpt4 key购买 nike

以下代码可以运行,但由于传递的数据集很大,速度很慢。在实际实现中,创建进程和发送数据的速度几乎与计算时间相同,所以当创建第二个进程时,第一个进程几乎完成计算,进行并行化?毫无意义。

代码与本题Multiprocessing has cutoff at 992 integers being joined as result相同建议的更改在下面工作并实现。但是,我遇到了其他人认为的常见问题,需要很长时间来处理大量数据。

我看到了使用 multiprocessing.array 传递共享内存数组的答案。我有一个约 4000 个索引的数组,但每个索引都有一个包含 200 个键/值对的字典。每个进程只读取数据,完成一些计算,然后返回一个矩阵 (4000x3)(没有字典)。

这样的回答 Is shared readonly data copied to different processes for Python multiprocessing?使用 map 。是否可以维护以下系统并实现共享内存?有没有一种有效的方法可以使用字典数组将数据发送到每个进程,例如将字典包装在某个管理器中,然后将其放入 multiprocessing.array 中?

import multiprocessing

def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,200):
data[str(i)] = i

CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
print 'in calc manager'
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
print 'starting processes'
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'

#Print out the results
for i in range(nprocs):
result = result_q.get()
print result

#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
print 'started process'
results = []
temp = []
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
result_q.put(results)
return

if __name__== '__main__':
main()

已解决

只需将字典列表放入管理器,问题就解决了。

manager=Manager()
d=manager.list(myData)

似乎持有列表的管理器也管理该列表包含的字典。启动时间有点慢,所以看起来数据仍在复制,但它在开始时完成一次,然后在进程内部对数据进行切片。

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,100):
data[str(i)] = i

CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])

manager = Manager()
d = manager.list(myData)

#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'

#Print out the results
for i in range(nprocs):
result = result_q.get()
print len(result)

#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
#print 'started process'
results = []
temp = []
data = data[start:end]
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
print len(data)
result_q.put(results)
return

if __name__ == '__main__':
main()

最佳答案

您可能会看到一些改进,方法是使用 multiprocessing.Manager 将您的列表存储在管理器服务器中,并让每个子进程通过从共享列表中拉出字典来访问项,而不是而不是将切片复制到每个子进程:

def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])

manager = Manager()
d = manager.list(myData)

nprocs = 3
result_q = multiprocessing.Queue()
procs = []

interval = (end-start)/nprocs
new_start = start

for i in range(nprocs):
new_end = new_start + interval
if new_end > end:
new_end = end
p = multiprocessing.Process(target=multiProcess,
args=(d, new_start, new_end, result_q, i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'

for i in range(nprocs):
result = result_q.get()
print len(result)

#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()

这会在创建任何 worker 之前将整个 data 列表复制到 Manager 进程。 Manager 返回一个 Proxy 对象,该对象允许共享访问 list。然后,您只需将 Proxy 传递给工作人员,这意味着他们的启动时间将大大减少,因为不再需要复制 data 列表的切片。这里的缺点是访问列表在 child 中会更慢,因为访问需要通过 IPC 转到管理器进程。这是否真的有助于提高性能在很大程度上取决于您在工作流程中对 list 所做的工作,但值得一试,因为它只需要很少的代码更改。

关于python - 多处理通过共享内存传递字典数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25620211/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com