gpt4 book ai didi

python-3.x - python apply_async不调用方法

转载 作者:行者123 更新时间:2023-12-01 22:37:17 25 4
gpt4 key购买 nike

我有一个需要处理大型数据库的方法,这需要几个小时/几天的时间来挖掘参数存储在一个(长)列表中,其中 max X 应在一批中处理。该方法不需要返回任何内容,但我为“有趣”返回“True”...

当我线性迭代它时(在此处未看到的其他表中生成/附加结果),该函数工作正常,但我无法获得 apply_async 或 map_async 工作。 (之前在其他项目中有效)任何有关我可能做错的事情的暗示将不胜感激,提前致谢!请参阅下面的代码:

    import multiprocessing as mp

class mainClass:
#loads of stuff

def main():
multiprocess = True
batchSize = 35

mC = mainClass()
while True:
toCheck = [key for key, value in mC.lCheckSet.items()] #the tasks are stored in a dictionary, I'm referring to them with their keys, which I turn to a list here for iteration.
if multiprocess == False:
#this version works perfectly fine
for i in toCheck[:batchSize]:
mC.check(i)
else:
#the async version does not, either with apply_async...
with mp.Pool(processes = 8) as pool:
temp = [pool.apply_async(mC.check, args=(toCheck[n],)) for n in range(len(toCheck[:batchSize]))]
results = [t.get() for t in temp]

#...or as map_async
pool = mp.Pool(processes = 8)
temp = pool.map_async(mC.check, toCheck[:batchSize])
pool.close()
pool.join()

if __name__=="__main__":
main()

最佳答案

这里的“味道”是,您在主进程上实例化 maincClass 一次,然后尝试在不同进程上调用它的方法 - 但请注意,当您通过 mC.check 到你的进程池,它是一个已经绑定(bind)到该进程中实例化的类的方法。

我猜你的问题就出在这里。尽管这可能有效 - 而且确实有效 - 我制作了这个简化版本并且它按预期工作:

import multiprocessing as mp
import random, time

class MainClass:
def __init__(self):
self.value = 1
def check(self, arg):
time.sleep(random.uniform(0.01, 0.3))
print(id(self),self.value, arg)

def main():
mc = MainClass()

with mp.Pool(processes = 4) as pool:
temp = [pool.apply_async(mc.check, (i,)) for i in range(8)]
results = [t.get() for t in temp]

main()

(您是否尝试过添加一些 print 以确保该方法根本没有运行?)因此,问题可能在于 MainClass 中的某些复杂状态,这些状态无法以良好的方式进入并行进程。一种可能的解决方法是在每个进程内实例化您的主类 - 这可以轻松完成,因为 MultiProcessing 允许您获取 current_process ,并使用此对象作为命名空间来保存进程中实例化的数据工作池,跨不同的调用来应用异步。

因此,创建一个新的 check 函数,如下所示 - 并且不要在主进程中实例化主类,而是在池中的每个进程中实例化它:

import multiprocessing as mp
import random, time

def check(arg):
process = mp.current_process
if not hasattr(process, "main_class"):
process.main_class = MainClass()
process.main_class.check(arg)


class MainClass:
def __init__(self):
self.value = random.randrange(100)
def check(self, arg):
time.sleep(random.uniform(0.01, 0.3))
print(id(self),self.value, arg)

def main():
mc = MainClass()

with mp.Pool(processes = 2) as pool:
temp = [pool.apply_async(check, (i,)) for i in range(8)]
results = [t.get() for t in temp]

main()

关于python-3.x - python apply_async不调用方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41379464/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com