gpt4 book ai didi

python - 在多处理 apply_async 中维护实例状态

转载 作者:行者123 更新时间:2023-12-03 21:14:23 32 4
gpt4 key购买 nike

我希望如果我调用 apply_async在实例方法中并获得其结果,所做的任何更改都将保留为 fork 进程的一部分。但是,似乎每次对 apply_async 的新调用都会创建所述实例的新副本。

采取以下代码:

from multiprocessing.pool import Pool


class Multitest:
def __init__(self):
self.i = 0

def run(self):
with Pool(2) as pool:
worker_jobs = []
for j in range(10):
job = pool.apply_async(self.process, (j,))
worker_jobs.append(job)

for job in worker_jobs:
res = job.get()
print("input", res)

def process(self, inp):
print("i", self.i)
self.i += 1

return inp

if __name__ == '__main__':
mt = Multitest()
mt.run()

样本输出:
i 0
i 0
i 0
i 0
i 0
input 0
i 0
i 0
i 0
i 0
i 0
input 1
input 2
input 3
input 4
input 5
input 6
input 7
input 8
input 9

但由于我们有两个核心,其中分布有 10 个输入,我预计 i要增加的属性。

我曾期望以下流程:
  • 主线程创建实例并调用run()
  • 主线程分配 apply_async 的工作通过初始化两个新进程和原始 Multitest 实例的副本(其中 i = 0 )
  • process()在新进程上被调用多次(直到 range() 用完)。在每次调用处理时,self.i为该进程递增

  • 备注 :我不是在询问两个进程之间的共享状态。相反,我问的是为什么单个进程的类实例没有发生变异(为什么每个单独进程的 self.i 没有增加)。

    但是,我没有看到这种行为。相反,打印的输出只有零,表明我的预期是错误的:状态(属性 i )没有被维护,但每次调用 apply_async 时都会创建一个新实例(或至少一个新副本) .我在这里缺少什么,我怎样才能使这项工作按预期进行? (最好使用 apply_async ,尽管不是必需的。但应保持结果的顺序。)

    据我所知,这种行为并非特定于 apply_async还有其他 pool方法。我有兴趣了解为什么会发生这种情况以及如何将行为更改为我想要实现的行为。赏金会找到可以为这两个查询提供答案的答案。

    最佳答案

    我想向您指出引用资料,但我还没有,所以我将根据经验证据分享我的想法:

    每次调用 apply_async 都会准备一个新的命名空间副本。您可以通过添加对 print(self) 的调用来查看这一点。进程内。所以这部分是不正确的:

    main thread distributes work ... by initializing two new processes and a copy of the original Multitest instance



    相反,有两个新进程和原始 Multitest 实例的十个副本。所有这些副本都是从主进程制作的,它的 i 副本没有增加。为了证明这一点,添加 time.sleep(1); self.i += 1在调用 apply_async 之前,请注意 a) 主线程中 i 的值增加,并且 b) 通过延迟 for 循环,原始 Multitest 实例在下一次调用 apply_async 触发新副本时发生了变化。

    代码:
    from multiprocessing.pool import Pool
    import time

    class Multitest:
    def __init__(self):
    print("Creating new Multitest instance: {}".format(self))
    self.i = 0

    def run(self):
    with Pool(2) as pool:
    worker_jobs = []
    for j in range(4):
    time.sleep(1); self.i += 1
    job = pool.apply_async(self.process, (j,))
    worker_jobs.append(job)

    for job in worker_jobs:
    res = job.get()
    print("input", res)

    def process(self, inp):
    print("i", self.i)
    print("Copied instance: {}".format(self))
    self.i += 1

    return inp

    if __name__ == '__main__':
    mt = Multitest()
    mt.run()

    结果:
    Creating new Multitest instance: <__main__.Multitest object at 0x1056fc8b0>
    i 1
    Copied instance: <__mp_main__.Multitest object at 0x101052d90>
    i 2
    Copied instance: <__mp_main__.Multitest object at 0x101052df0>
    i 3
    Copied instance: <__mp_main__.Multitest object at 0x101052d90>
    input 0
    input 1
    input 2
    i 4
    Copied instance: <__mp_main__.Multitest object at 0x101052df0>
    input 3

    至于您的第二个查询,我认为如果您希望在流程中维护状态,您可能只需要提交一项工作。 Pool(2) 不是处理 10 个独立的作业,而是让 Pool(2) 处理 2 个独立的作业,每个作业由 5 个相互依赖的子作业组成。或者,如果您真的想要 10 个作业,您可以使用由 pid 索引的共享数据结构,这样在单个进程中(按顺序)操作的所有作业都可以操作 i 的单个副本。

    这是一个具有共享数据结构的示例,采用模块中的全局形式:
    from multiprocessing.pool import Pool
    from collections import defaultdict
    import os
    import myglobals # (empty .py file)

    myglobals.i = defaultdict(lambda:0)

    class Multitest:
    def __init__(self):
    pid = os.getpid()
    print("Creating new Multitest instance: {}".format(self))
    print("i {} (pid: {})".format(myglobals.i[pid], pid))

    def run(self):
    with Pool(2) as pool:
    worker_jobs = []
    for j in range(4):
    job = pool.apply_async(self.process, (j,))
    worker_jobs.append(job)

    for job in worker_jobs:
    res = job.get()
    print("input", res)

    def process(self, inp):
    pid = os.getpid()
    print("Copied instance: {}".format(self))
    print("i {} (pid: {})".format(myglobals.i[pid], pid))
    myglobals.i[pid] += 1

    return inp

    if __name__ == '__main__':
    mt = Multitest()
    mt.run()

    结果:
    Creating new Multitest instance: <__main__.Multitest object at 0x1083f3880>
    i 0 (pid: 3460)
    Copied instance: <__mp_main__.Multitest object at 0x10d89cdf0>
    i 0 (pid: 3463)
    Copied instance: <__mp_main__.Multitest object at 0x10d89ce50>
    Copied instance: <__mp_main__.Multitest object at 0x10550adf0>
    i 0 (pid: 3462)
    Copied instance: <__mp_main__.Multitest object at 0x10550ae50>
    i 1 (pid: 3462)
    i 1 (pid: 3463)
    input 0
    input 1
    input 2
    input 3

    该技术来自 https://stackoverflow.com/a/1676328/361691

    关于python - 在多处理 apply_async 中维护实例状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61775270/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com