gpt4 book ai didi

python - 提交代码以执行到 concurrent.futures.ProcessPool 中的所有进程

转载 作者:行者123 更新时间:2023-12-03 17:17:56 25 4
gpt4 key购买 nike

语境:

  • 使用 concurrent.futures.process.ProcessPool 的 Python 应用程序服务器执行代码
  • 我们有时想在不重启整个服务器进程的情况下热重载导入的代码

  • (是的,我知道 importlib.reloadcaveats)
    为了让它工作,我想我必须执行 importlib.reload在每个 multiprocessing由进程池管理的进程。
    有没有办法向 提交内容?全部 进程池中的进程?

    最佳答案

    我不知道这将如何与您提到的热重新加载尝试有关,但是您真正提出的一般问题是可以回答的。

    Is there a way to submit something to all processes in a process pool?


    这里的挑战在于确保真正所有流程都得到这个 something一次且仅一次,在每个进程都得到它之前不会进一步执行。
    您可以在 multiprocessing.Barrier(parties[, action[, timeout]]) 的帮助下获得此类必要的同步。 .屏障将阻止各方调用 barrier.wait()直到每一方都这样做,然后立即释放它们。
    import multiprocessing as mp
    from concurrent.futures import ProcessPoolExecutor


    def foo(x):
    for _ in range(int(42e4)):
    pass
    return x


    def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


    def init_barrier(barrier):
    globals()['barrier'] = barrier


    if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
    MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
    print(list(executor.map(foo, range(10))))
    # then something for all processes
    futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
    for f in futures:
    f.result()

    print(list(executor.map(foo, range(10))))
    示例输出:
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    ForkProcess-3 --- reloading something and waiting.
    ForkProcess-2 --- reloading something and waiting.
    ForkProcess-1 --- reloading something and waiting.
    ForkProcess-4 --- reloading something and waiting.
    ForkProcess-1 --- released.
    ForkProcess-4 --- released.
    ForkProcess-3 --- released.
    ForkProcess-2 --- released.
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

    Process finished with exit code 0
    如果您同意保留 barrier全局和 multiprocessing.get_context()._name返回 "fork" ,您不需要使用 initializer因为全局变量将通过 fork 被继承和访问。

    关于python - 提交代码以执行到 concurrent.futures.ProcessPool 中的所有进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64392998/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com