gpt4 book ai didi

python - 如何有效地让多处理进程读取不可变的大数据

转载 作者:行者123 更新时间:2023-11-30 23:30:21 26 4
gpt4 key购买 nike

我想使用多处理来使用多个核心来运行一个对大列表中的元素进行成对比较的过程:

data = [...] #when loaded this is > 100MB
for i in xrange(len(data)-1):
parent = data[i]
for j in xrange(i,len(data)):
child = data[j]
#do something with parent and child

所以如果我设置一个进程队列:

def worker(queue):
while True:
args = queue.get()
if args == 'EOF':
break
f(*args)

def f(data, x, start):
for i in xrange(start,len(data)):
#do stuff

if __name__ == '__main__':
from multiprocessing import Process, Queue, cpu_count
import psycopg2

cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
#when loaded into memory data is > 100MB

other_f_arg = 'some object'

queue = Queue()
#spawn 1 child per core:
workers = [Process(target=worker, args=((queue,)) for cpu in xrange(cpu_count())]
for w in workers:
w.start()

for i in xrange(len(data)-1):
queue.put((data, other_f_arg, i))

queue.put('EOF')
for w in workers:
w.join()

当它运行时,queue.put 在每次迭代时将 data 推送到队列中,即使数据只需要读取一次,然后每个进程重新引用即可。因此,多进程的所有优点都被重复的数据传递所抵消。如何让每个进程只获取一次 dataother_f_arg 的副本,然后只传递动态变量 i 作为工作人员被释放了?

更新1:

我决定按照 Tim Peters 的建议使用 Pool,但我没有使用 map,而是使用 apply_async 和回调(因为我希望父进程以串行方式对 f 返回进行一些后处理,而不是等待所有提交完成(因为 f 将返回内存中较大的内容)也):

def worker_init(xdata):
global data
data = xdata

def callback(result, x):
#do something with result of f(i), and x

def f(i):
#do something with data[i]
return result

if __name__ == '__main__':
...
data = psycopg2_cursor.fetchall()

NUM_CPU = None
from multiprocessing import Pool
from functools import partial


pool = Pool(processes=NUM_CPU,
initializer=worker_init,
initargs=(data,))

x = 'some extra param I want to pass to callback'
shim_callback = partial(callback, x=x)

for i in xrange(len(data)-1):
pool.apply_async(f,
args=(i,),
callback=shim_callback)

pool.close()
pool.join()

有什么方法可以将子级中未捕获的异常重定向到控制台吗? (就像在单线程进程中引发的异常?)我问是因为 f 中未捕获的异常似乎只是破坏了调用 apply_async 的循环,并且我没有得到任何错误控制台或任何东西。

最佳答案

最简单:在 Linux-y 系统(支持 fork() 的操作系统)上,在模块级别定义 data。然后,由于神奇的 fork() 语义,所有工作进程都会神奇地看到(一份)data

更便携:使用multiprocessing.Pool()代替。创建池时,您可以指定要运行的初始化函数以及要传递给该函数的参数。然后,每个进程只需将data传递给某个函数,例如将其绑定(bind)到模块全局名称。其他函数可以只引用该模块全局。 Pool() 还支持多种传递工作(和检索结果)的方法,这些方法不需要您显式管理队列。这里不知道足够的详细信息来表明这对于您的具体问题是更好还是更糟。

充实“可移植”方式

这是一种方法:

NUM_CPU = None  # defaults to all available CPUs

def worker_init(xdata, xother_f_arg):
global data, other_f_arg
data = xdata
other_f_arg = xother_f_arg

def f(start):
for i in xrange(start, len(data)):
#do stuff

if __name__ == '__main__':
from multiprocessing import Pool
import psycopg2

cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')
other_f_arg = 'some object'

pool = Pool(processes=NUM_CPU,
initializer=worker_init,
initargs=(data, other_f_arg))
pool.map(f, xrange(len(data) - 1))
pool.close()
pool.join()

请注意,它的代码也比吊起您自己的队列要少得多。

虽然我无法确定运行您的代码,但我希望您最好不要使用多处理传递巨大的数据 > 机器,而不是让每个工作人员从数据库加载自己的副本。大致如下:

def worker_init(xother_f_arg):
import psycopg2
global data, other_f_arg
other_f_arg = xother_f_arg
cur = psycopg2.connect(...).cursor()
data = cur.execute('SELECT * from table')

编辑 - 处理错误

并行技巧很难在子进程(或线程)中引发异常,因为它们发生在通常与主程序当时正在执行的操作无关的上下文中。处理此问题的最简单方法是保留对您正在创建的 AsyncResult 对象的引用,并显式地从中获取 .get() 结果(丢失回调!这毫无用处)这里的复杂性)。替换您的:

for i in xrange(len(data)-1):
pool.apply_async(f,
args=(i,),
callback=shim_callback)

例如,

# queue up all the work
futures = [pool.apply_async(f, args=(i,))
for i in xrange(len(data) - 1)]
# retrieve results
for fut in futures:
try:
result = fut.get()
except NameExceptionsYouWantToCatchHere as e:
# do whatever you want with the exception
else:
# process result

来自文档(当前的 Python 2):

get([timeout])

Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

在 Python 3 中,还有一个 map_async() 方法,以及许多 Pool() 方法上的可选 error_callback 参数。

注意:如果len(data) i如果非常大,multiprocessing 机制会消耗相应大量的 RAM 来对所有工作项进行排队 - apply_async() 永远不会阻塞,并且循环会对工作项进行排队尽可能快。在这种情况下,可能需要另一层缓冲。

关于python - 如何有效地让多处理进程读取不可变的大数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20640840/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com