- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在用 Python 编写一个永远运行并随机接收请求的程序必须并行处理。每个请求可能需要几十分钟处理并给 CPU 带来一些负担,因此 asyncio 不是一种选择。为了每个请求我都会启动一个新的工作进程。
问题是,如果我不在 worker 完成后调用 join()
,它变成了一个僵尸进程。
我目前的解决方案是定期迭代所有工作进程并调用join()
如果他们完成了。有没有比使用更优雅的方法multiprocessing.Queue.get()
超时?也许是事件驱动的方法?还是在这种情况下使用超时完全没问题?请看我的以下代码当前解决方案。
#!/usr/bin/env python3
import multiprocessing as mp
import queue
import random
import time
from typing import List
def main():
q = mp.Queue()
p_produce = mp.Process(target=produce, args=(q,))
p_receive = mp.Process(target=receive, args=(q,))
p_produce.start()
p_receive.start()
p_receive.join()
p_produce.join()
def produce(q: mp.Queue):
for i in range(10):
print(f"put({i})")
q.put(str(i))
time.sleep(random.uniform(2.0, 3.0))
q.put("EOF")
def receive(q: mp.Queue):
workers = [] # type: List[mp.Process]
while True:
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
try:
request = q.get(block=True, timeout=1) # Is there a better way?
except queue.Empty:
continue
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()
def worker(name: str):
print(f"Working on {name}")
time.sleep(random.uniform(2.0, 3.0))
if __name__ == "__main__":
main()
最佳答案
正如@Giannis 在评论中建议的那样,您正在从头开始 reshape 流程管理器。坚持Python自带的东西,你对使用multiprocessing.Pool
有什么异议吗?如果是,是什么?
执行此操作的通常方法是选择您希望同时运行的最大工作进程数。说,
NUM_WORKERS = 4
然后将其作为您的 receive()
函数的替代:
def receive(q: mp.Queue):
pool = mp.Pool(NUM_WORKERS)
while True:
request = q.get()
if request == "EOF":
break
pool.apply_async(worker, args=(request,))
pool.close()
pool.join()
NUM_WORKERS
进程创建一次,并在任务之间重复使用。如果出于某种原因您需要(或想要)每个任务的全新进程,您只需将 maxtasksperchild=1
添加到 Pool
构造函数中。
如果出于某种原因你需要知道每个任务何时完成,你可以,例如,将 callback=
参数添加到 apply_async()
调用并编写一个任务结束时将调用的小函数(它将接收作为参数的 worker()
函数返回的内容)。
所以事实证明,您的真实应用程序中的工作进程想要(无论出于何种原因)创建自己的进程,而 Pool
创建的进程不能这样做。它们被创建为“守护进程”进程。来自文档:
When a process exits, it attempts to terminate all of its daemonic child processes.
Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits.
非常清晰;-) 这里有一个精心设计的方法来创建你自己的 Pool
workalike 来创建非守护进程,但对我来说太复杂了:
Python Process Pool non-daemonic?
回到您已经知道有效的原始设计,我只是将其更改为将定期加入工作进程的逻辑与操作队列的逻辑分开。从逻辑上讲,他们之间确实没有任何关系。具体来说,创建一个“后台线程”来加入对我来说很有意义:
def reap(workers, quit):
from time import sleep
while not quit.is_set():
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
sleep(2) # whatever you like
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()
def receive(q: mp.Queue):
import threading
workers = [] # type: List[mp.Process]
quit = threading.Event()
reaper = threading.Thread(target=reap, args=(workers, quit))
reaper.start()
while True:
request = q.get()
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
quit.set()
reaper.join()
我碰巧知道 list.append()
和 list.remove()
在 CPython 中是线程安全的,所以没有需要 用锁来保护这些操作。但如果您添加一个也不会造成伤害。
虽然 Pool
创建的进程是守护进程,但类似的 concurrent.futures.ProcessPoolExecutor
创建的进程似乎不是。所以我的第一个建议的这个简单变体可能对你有用(或者可能不对 ;-) ):
NUM_WORKERS = 4
def receive(q: mp.Queue):
import concurrent.futures as cf
with cf.ProcessPoolExecutor(NUM_WORKERS) as e:
while True:
request = q.get()
if request == "EOF":
break
e.submit(worker, request)
如果这对您有用,那么很难想象还有什么比这更简单的了。
关于python - 通过定期调用 `join()` 避免僵尸进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43131145/
我们已经有一个使用 AnyEvent 的库。它在内部使用 AnyEvent,并最终返回一个值(同步 - 不使用回调)。有什么方法可以将这个库与 Mojolicious 一起使用吗? 它的作用如下: #
我想从 XSD 文件生成带有 JAXB 的 Java 类。 问题是,我总是得到一些像这样的类(删除了命名空间): public static class Action { @X
我有一个关于 html 输入标签或 primefaces p:input 的问题。为什么光标总是自动跳转到输入字段。我的页面高度很高,因此您需要向下滚动。输入字段位于页面末尾,光标自动跳转(加载)到页
我今天在考虑面向对象设计,我想知道是否应该避免 if 语句。我的想法是,在任何需要 if 语句的情况下,您都可以简单地创建两个实现相同方法的对象。这两个方法实现只是原始 if 语句的两个可能的分支。
String graphNameUsed = graphName.getName(); if (graphType.equals("All") || graphType.equals(
我有一张友谊 table CREATE TABLE IF NOT EXISTS `friendList` ( `id` int(10) NOT NULL, `id_friend` int(10
上下文 Debian 64。Core 2 二人组。 摆弄循环。我使用了同一循环的不同变体,但我希望尽可能避免条件分支。 但是,即使我认为它也很难被击败。 我考虑过 SSE 或位移位,但它仍然需要跳转(
我最近在 Java 中创建了一个方法来获取字符串的排列,但是当字符串太长时它会抛出这个错误:java.lang.OutOfMemoryError: Java heap space我确信该方法是有效的,
我正在使用 (C++) 库,其中需要使用流初始化对象。库提供的示例代码使用此代码: // Declare the input stream HfstInputStream *in = NULL; tr
我有一个 SQL 查询,我在 WHERE 子句中使用子查询。然后我需要再次使用相同的子查询将其与不同的列进行比较。 我假设没有办法在子查询之外访问“emp_education_list li”? 我猜
我了解到在 GUI 线程上不允许进行网络操作。对我来说还可以。但是为什么在 Dialog 按钮点击回调上使用这段代码仍然会产生 NetworkOnMainThreadException ? new T
有没有办法避免在函数重定向中使用 if 和硬编码字符串,想法是接收一个字符串并调用适当的函数,可能使用模板/元编程.. #include #include void account() {
我正在尝试避免客户端出现 TIME_WAIT。我连接然后设置 O_NONBLOCK 和 SO_REUSEADDR。我调用 read 直到它返回 0。当 read 返回 0 时,errno 也为 0。我
我正在开发 C++ Qt 应用程序。为了在应用程序或其连接的设备出现故障时帮助用户,程序导出所有内部设置并将它们存储在一个普通文件(目前为 csv)中。然后将此文件发送到公司(例如通过邮件)。 为避免
我有一组具有公共(public)父类(super class)的 POJO。这些存储在 superclass 类型的二维数组中。现在,我想从数组中获取一个对象并使用子类 的方法。这意味着我必须将它们转
在我的代码中,当 List 为 null 时,我通常使用这种方法来避免 for 语句中的 NullPointerException: if (myList != null && myList.size
我正在尝试避免客户端出现 TIME_WAIT。我连接然后设置 O_NONBLOCK 和 SO_REUSEADDR。我调用 read 直到它返回 0。当 read 返回 0 时,errno 也为 0。我
在不支持异常的语言和/或库中,许多/几乎所有函数都会返回一个值,指示其操作成功或失败 - 最著名的例子可能是 UN*X 系统调用,例如 open( ) 或 chdir(),或一些 libc 函数。 无
我尝试按值提取行。 col1 df$col1[col1 == "A"] [1] "A" NA 当然我只想要“A”。如何避免 R 选择 NA 值?顺便说一句,我认为这种行为非常危险,因为很多人都会陷入
我想将两个向量合并到一个数据集中,并将其与函数 mutate 集成为 5 个新列到现有数据集中。这是我的示例代码: vector1% rowwise()%>% mutate(vector2|>
我是一名优秀的程序员,十分优秀!