gpt4 book ai didi

Python 多处理和共享变量

转载 作者:行者123 更新时间:2023-12-02 09:17:35 26 4
gpt4 key购买 nike

我不是 python 专家,但我已经成功编写了一个多处理代码,该代码使用我 PC 中的所有 cpu 和内核。我的代码加载了一个非常大的数组,大约 1.6 GB,并且我需要在每个进程中更新该数组。幸运的是,更新包括向图像添加一些人造星星,并且每个进程都有一组不同的图像位置来添加人造星星。

图像太大,我无法在每次调用进程时创建新图像。我的解决方案是在共享内存中创建一个变量,这样可以节省大量内存。由于某种原因,它适用于 90% 的图像,但在某些区域,我的代码在我之前发送到进程的一些位置中添加了随机数。与我创建共享变量的方式有关吗?在我的代码执行过程中,进程是否互相干扰?

奇怪的是,当使用单 cpu 和单核时,图像是 100% 完美的,并且图像中没有添加随机数。您是否建议我一种在多个进程之间共享大型数组的方法?这是我的代码的相关部分。请阅读我定义变量 im_data 时的行。

import warnings
warnings.filterwarnings("ignore")

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes

class Worker(multiprocessing.Process):


def __init__(self, work_queue, result_queue):

# base class initialization
multiprocessing.Process.__init__(self)

# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False

def run(self):
while not self.kill_received:

# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break

# the actual processing
print "Adding artificial stars - index range=", i_range

radius=16
x_c,y_c=( (psf_size[1]-1)/2, (psf_size[2]-1)/2 )
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)

for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1


psf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor

npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor

im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]

im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.


self.result_queue.put(id)

if __name__ == "__main__":

n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]

print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()

# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil( mock_n*1./n_processes ))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)


work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put( (j_range[j:j+2], psf_file[i]) )

# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()

# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()

# collect the results off the queue
while not work_queue.empty():
result_queue.get()

print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)

最佳答案

我认为问题(正如您在问题中所建议的那样)来自于您从多个线程在同一个数组中写入这一事实。

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data

虽然我非常确定您可以以“进程安全”的方式写入 im_data_base (Python 使用隐式锁来同步对数组的访问),但我不确定您是否可以可以以进程安全的方式写入 im_data

因此,我(尽管我不确定是否能解决您的问题)建议您在 im_data 周围创建一个显式

# Disable python implicit lock, we are going to use our own
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1],
lock=False)
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
# Create our own lock
im_data_lock = Lock()

然后在进程中,每次需要修改im_data时获取锁

self.im_data_lock.acquire()
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10
self.im_data_lock.release()

为了简洁起见,我省略了将锁传递给进程的构造函数并将其存储为成员字段 (self.im_data_lock) 的代码。您还应该将 im_data 数组传递给流程的构造函数并将其存储为成员字段。

关于Python 多处理和共享变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15903788/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com