- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我不是 python 专家,但我已经成功编写了一个多处理代码,该代码使用我 PC 中的所有 cpu 和内核。我的代码加载了一个非常大的数组,大约 1.6 GB,并且我需要在每个进程中更新该数组。幸运的是,更新包括向图像添加一些人造星星,并且每个进程都有一组不同的图像位置来添加人造星星。
图像太大,我无法在每次调用进程时创建新图像。我的解决方案是在共享内存中创建一个变量,这样可以节省大量内存。由于某种原因,它适用于 90% 的图像,但在某些区域,我的代码在我之前发送到进程的一些位置中添加了随机数。与我创建共享变量的方式有关吗?在我的代码执行过程中,进程是否互相干扰?
奇怪的是,当使用单 cpu 和单核时,图像是 100% 完美的,并且图像中没有添加随机数。您是否建议我一种在多个进程之间共享大型数组的方法?这是我的代码的相关部分。请阅读我定义变量 im_data 时的行。
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes
class Worker(multiprocessing.Process):
def __init__(self, work_queue, result_queue):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.result_queue = result_queue
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
i_range, psf_file = self.work_queue.get_nowait()
except Queue.Empty:
break
# the actual processing
print "Adding artificial stars - index range=", i_range
radius=16
x_c,y_c=( (psf_size[1]-1)/2, (psf_size[2]-1)/2 )
x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
distance = np.sqrt(x**2 + y**2)
for i in range(i_range[0],i_range[1]):
psf_xy=np.zeros(psf_size[1:3], dtype=float)
j=0
for i_order in range(psf_order+1):
j_order=0
while (i_order+j_order < psf_order+1):
psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
j_order+=1
j+=1
psf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(psf_xy)
psf_xy *= psf_factor
npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
npsf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
npsf_xy *= npsf_factor
im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.
self.result_queue.put(id)
if __name__ == "__main__":
n_cpu=2
n_core=6
n_processes=n_cpu*n_core*1
input_mock_file=sys.argv[1]
print "Reading file ", im_file[i]
hdu=pyfits.open(im_file[i])
data=hdu[0].data
im_size=data.shape
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
data=0
assert im_data.base.base is im_data_base.get_obj()
# run
# load up work queue
tic=time.time()
j_step=np.int(np.ceil( mock_n*1./n_processes ))
j_range=range(0,mock_n,j_step)
j_range.append(mock_n)
work_queue = multiprocessing.Queue()
for j in range(np.size(j_range)-1):
if work_queue.full():
print "Oh no! Queue is full after only %d iterations" % j
work_queue.put( (j_range[j:j+2], psf_file[i]) )
# create a queue to pass to workers to store the results
result_queue = multiprocessing.Queue()
# spawn workers
for j in range(n_processes):
worker = Worker(work_queue, result_queue)
worker.start()
# collect the results off the queue
while not work_queue.empty():
result_queue.get()
print "Writing file ", mock_im_file[i]
hdu[0].data=im_data
hdu.writeto(mock_im_file[i])
print "%f s for parallel computation." % (time.time() - tic)
最佳答案
我认为问题(正如您在问题中所建议的那样)来自于您从多个线程在同一个数组中写入这一事实。
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
虽然我非常确定您可以以“进程安全”的方式写入 im_data_base
(Python 使用隐式锁来同步对数组的访问),但我不确定您是否可以可以以进程安全的方式写入 im_data
。
因此,我(尽管我不确定是否能解决您的问题)建议您在 im_data
周围创建一个显式锁
# Disable python implicit lock, we are going to use our own
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1],
lock=False)
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
# Create our own lock
im_data_lock = Lock()
然后在进程中,每次需要修改im_data
时获取锁
self.im_data_lock.acquire()
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10
self.im_data_lock.release()
为了简洁起见,我省略了将锁传递给进程的构造函数并将其存储为成员字段 (self.im_data_lock
) 的代码。您还应该将 im_data
数组传递给流程的构造函数并将其存储为成员字段。
关于Python 多处理和共享变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15903788/
只是想知道是否有可能找出谁从 Windows 共享中读取了文件(最好使用 .NET,但 win32 native 可以)? 我想做的是创建类似 awstats 的东西对于 Windows 共享,这样我
是否可以列出 Intent.ACTION_SEND ?我的意思是我需要知道是否有人通过 action_send 在 Facebook 上分享或在 Twitter 上发推文。 最佳答案 也许你想要一个更
我正在使用 Google Apps 应用程序。实际上,我想在不使用密码的情况下访问另一个 ID。我使用了 OAuth,它运行良好。但我无法分享特定人的日历。我尝试了以下代码。 GoogleOAuthP
我怎样才能只创建模拟器...可能吗?我知道,设备需要分发证书。 最佳答案 您只需将应用程序目录从 iPhone 模拟器复制到另一个实例/操作系统版本,它就应该可以工作。 因此,如果您想分发 3.1.3
我想使用多阶段构建来避免每次构建应用程序时都下载我的 Java 项目所需的所有 Maven 依赖项。 我正在考虑在第一阶段解决 Maven 依赖项,然后在第二阶段构建应用程序,这将需要访问在前一阶段下
我正在寻找保护用户下载内容的初步想法。用户下载充满有趣资源的 zip 文件,这些资源被提取到本地文件系统中以供应用程序使用。我的目标是防止用户通过互联网将下载的资源共享给其他用户(假设他们获得了对文件
我想知道在具有移动和桌面版本的网站上共享身份验证、 session 管理等的最佳方法是什么。我们正在运行 Tomcat,并且更愿意将移动站点和桌面站点的应用程序保持在不同的节点上。 我看过类似的帖子,
我发现了这个单例的实现。我怎样才能创建指向它的指针或共享指针?` 为什么这不起作用?自动测试 = Singleton::Instance(); class Singleton { public: st
我有一个 heroku 项目,我想与其他人分享。作为the instructions describe ,我使用 virtualenv 来管理环境和依赖项。有没有办法在新机器上从 requiremen
Maven 将所有 jar 存储在本地存储库 ~/.m2/repository/ 下。用户多时占用空间大。 那么,是否可以由多个用户共享这个本地存储库,或许在不同的目录结构下? 最佳答案 简单的回答
为什么共享 worker 在重新加载页面时死了?应该是复活了我该如何解决这个问题? 重装前 重新加载后(在example.com上按F5) parent worker var port = new S
我正在开发多个小型应用程序,这些应用程序将共享通用和共享模块和 Assets 。 关于如何创建项目结构的部分在这里回答:https://stackoverflow.com/a/61254557/135
我在 RHEL 上安装了 jenkins (localhost:8080),我能够成功地构建代码 现在,我想设置主/从代理。 我的笔记本电脑将充当“Master Jenkins”,而我同事的笔记本电脑
我有这种方法可以根据我使用的 EXTRA_STREAM 共享文本文件或图片。我有这两个我可以选择 i.putExtra(Intent.EXTRA_STREAM, uri); i.putExtra(In
我正在使用 R 中的一个数据分析项目,我正在使用 R 中的敏感私有(private)数据进行一些逻辑和多级建模。我爱上了 。预订 包,我已经创建了一本关于我们的工作流程和分析管道的相当广泛的书。问题是
我正在构建的应用程序需要在 UITabBarController 框架内为多个 View (及其 subview )显示共享的自定义 UIToolbar。自定义工具栏的内容在所有 View 中都是相同
我有多个应用程序,我想共享相同的 eslint 配置: - project_root/ - app1/ - node_modules/ - eslint.rc
我有多个 Electron 应用程序。一个是主应用程序,其他几个功能应用程序。主应用程序上的按钮很少,这将导致功能应用程序打开。这里的问题是每个应用程序都有一个主进程,该进程导致要利用更多的CPU。是
我正在开发一个 Node.js 后端,它通过 websocket 与一些桌面客户端进行通信,而服务器端的通信是从 Web 前端发起的。一切正常,因为我将 SockJS Connection 实例存储在
我对托管多个网站的服务器上的多个用户帐户使用私有(private) SSH key 和无密码条目。 我为每个用户帐户使用相同的私钥。 (因为我很懒?或者那是“正确”的方式)。 我现在想授权该国不同地区
我是一名优秀的程序员,十分优秀!