gpt4 book ai didi

python - 通过定期调用 `join()` 避免僵尸进程

转载 作者:太空宇宙 更新时间:2023-11-04 08:43:14 24 4
gpt4 key购买 nike

我正在用 Python 编写一个永远运行并随机接收请求的程序必须并行处理。每个请求可能需要几十分钟处理并给 CPU 带来一些负担,因此 asyncio 不是一种选择。为了每个请求我都会启动一个新的工作进程。

问题是,如果我不在 worker 完成后调用 join(),它变成了一个僵尸进程。

我目前的解决方案是定期迭代所有工作进程并调用join() 如果他们完成了。有没有比使用更优雅的方法multiprocessing.Queue.get() 超时?也许是事件驱动的方法?还是在这种情况下使用超时完全没问题?请看我的以下代码当前解决方案。

#!/usr/bin/env python3

import multiprocessing as mp
import queue
import random
import time
from typing import List


def main():
q = mp.Queue()
p_produce = mp.Process(target=produce, args=(q,))
p_receive = mp.Process(target=receive, args=(q,))

p_produce.start()
p_receive.start()

p_receive.join()
p_produce.join()


def produce(q: mp.Queue):
for i in range(10):
print(f"put({i})")
q.put(str(i))
time.sleep(random.uniform(2.0, 3.0))
q.put("EOF")


def receive(q: mp.Queue):
workers = [] # type: List[mp.Process]
while True:
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)

try:
request = q.get(block=True, timeout=1) # Is there a better way?
except queue.Empty:
continue

if request == "EOF":
break

p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)

for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()


def worker(name: str):
print(f"Working on {name}")
time.sleep(random.uniform(2.0, 3.0))


if __name__ == "__main__":
main()

最佳答案

正如@Giannis 在评论中建议的那样,您正在从头开始 reshape 流程管理器。坚持Python自带的东西,你对使用multiprocessing.Pool有什么异议吗?如果是,是什么?

执行此操作的通常方法是选择您希望同时运行的最大工作进程数。说,

NUM_WORKERS = 4

然后将其作为您的 receive() 函数的替代:

def receive(q: mp.Queue):
pool = mp.Pool(NUM_WORKERS)
while True:
request = q.get()
if request == "EOF":
break
pool.apply_async(worker, args=(request,))
pool.close()
pool.join()

NUM_WORKERS 进程创建一次,并在任务之间重复使用。如果出于某种原因您需要(或想要)每个任务的全新进程,您只需将 maxtasksperchild=1 添加到 Pool 构造函数中。

如果出于某种原因你需要知道每个任务何时完成,你可以,例如,将 callback= 参数添加到 apply_async() 调用并编写一个任务结束时将调用的小函数(它将接收作为参数的 worker() 函数返回的内容)。

魔鬼在守护进程中

所以事实证明,您的真实应用程序中的工作进程想要(无论出于何种原因)创建自己的进程,而 Pool 创建的进程不能这样做。它们被创建为“守护进程”进程。来自文档:

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits.

非常清晰;-) 这里有一个精心设计的方法来创建你自己的 Pool workalike 来创建非守护进程,但对我来说太复杂了:

Python Process Pool non-daemonic?

回到您已经知道有效的原始设计,我只是将其更改为将定期加入工作进程的逻辑与操作队列的逻辑分开。从逻辑上讲,他们之间确实没有任何关系。具体来说,创建一个“后台线程”来加入对我来说很有意义:

def reap(workers, quit):
from time import sleep
while not quit.is_set():
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
sleep(2) # whatever you like
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()

def receive(q: mp.Queue):
import threading
workers = [] # type: List[mp.Process]
quit = threading.Event()
reaper = threading.Thread(target=reap, args=(workers, quit))
reaper.start()

while True:
request = q.get()
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)

quit.set()
reaper.join()

我碰巧知道 list.append()list.remove() 在 CPython 中是线程安全的,所以没有需要 用锁来保护这些操作。但如果您添加一个也不会造成伤害。

还有一个可以试试

虽然 Pool 创建的进程是守护进程,但类似的 concurrent.futures.ProcessPoolExecutor 创建的进程似乎不是。所以我的第一个建议的这个简单变​​体可能对你有用(或者可能不对 ;-) ):

NUM_WORKERS = 4

def receive(q: mp.Queue):
import concurrent.futures as cf
with cf.ProcessPoolExecutor(NUM_WORKERS) as e:
while True:
request = q.get()
if request == "EOF":
break
e.submit(worker, request)

如果这对您有用,那么很难想象还有什么比这更简单的了。

关于python - 通过定期调用 `join()` 避免僵尸进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43131145/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com