gpt4 book ai didi

python - 在 Python 多处理中将 Pool.map 与共享内存数组结合起来

转载 作者:太空宇宙 更新时间:2023-11-03 20:43:25 25 4
gpt4 key购买 nike

我有一个非常大(只读)的数据数组,希望由多个进程并行处理。

我喜欢 Pool.map 函数,并希望使用它来并行计算该数据的函数。

我发现可以使用 ValueArray 类在进程之间使用共享内存数据。但是当我尝试使用它时,我得到一个 RuntimeError: 'SynchronizedString 对象只能在使用 Pool.map 函数时通过继承在进程之间共享:

这是我正在尝试做的事情的简化示例:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count

if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)

# this works
print count_it( toShare, "a" )

pool = Pool()

# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

谁能告诉我我在这里做错了什么?

所以我想做的就是在进程池中创建进程后,将有关新创建的共享内存分配数组的信息传递给进程。

最佳答案

再次尝试,因为我刚刚看到了赏金;)

基本上,我认为错误消息的意思就是它所说的 - 多处理共享内存数组不能作为参数传递(通过 pickling)。序列化数据没有意义——关键是数据是共享内存。所以你必须使共享数组成为全局的。我认为将其作为模块的属性更简洁,就像我的第一个答案一样,但在示例中将其保留为全局变量也效果很好。考虑到您不想在 fork 之前设置数据的观点,这里是一个修改后的示例。如果您想要多个可能的共享数组(这就是为什么您想将 toShare 作为参数传递),您可以类似地创建共享数组的全局列表,然后将索引传递给 count_it (这将成为 for toShare[i]: 中的 c)。

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count

if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)

# fork
pool = Pool()

# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData

print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:由于不使用 fork,以上内容在 Windows 上不起作用。但是,下面的代码在 Windows 上仍然有效,仍然使用 Pool,所以我认为这是最接近您想要的:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count

def initProcess(share):
mymodule.toShare = share

if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)

# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))

# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData

print pool.map( count_it, ["a", "b", "s", "d"] )

不知道为什么map不会Pickle数组,但Process和Pool会 - 我想它可能已经在Windows上的子进程初始化时被传输了。请注意,数据在 fork 后仍然设置。

关于python - 在 Python 多处理中将 Pool.map 与共享内存数组结合起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56740719/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com