- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
<强>1。为什么以下使用 concurrent.futures
模块的 Python 代码会永远挂起?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
调用引发了一个不可见的异常[Errno 24] Too many open files
(要查看它,请将 executor.submit(super().f)
行替换为print(executor.submit(super().f).exception())
).
但是,将 ProcessPoolExecutor
替换为 ThreadPoolExecutor
会按预期打印“called”。
<强>2。为什么以下使用 multiprocessing.pool
模块的 Python 代码会引发异常 AssertionError: daemonic processes are not allowed to have children
?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
但是,将 Pool
替换为 ThreadPool
会按预期打印“called”。
环境:CPython 3.7,MacOS 10.14。
最佳答案
concurrent.futures.ProcessPoolExecutor
和 multiprocessing.pool.Pool
使用 multiprocessing.queues.Queue
将工作函数对象从调用者传递到工作进程,Queue
使用 pickle
模块进行序列化/反序列化,但未能正确处理子类实例绑定(bind)的方法对象:
f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)
输出:
<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>
A.f
变为 B.f
,这实际上在工作进程中创建了无限递归调用 B.f
到 B.f
。
pickle.dumps
利用绑定(bind)方法对象的 __reduce__
方法,IMO,its implementation , 没有考虑这种场景,它不关心真正的 func
对象,而只是尝试从实例 self
obj (B()
) 的简单名称 (f
),结果是 B.f
,很可能是一个错误。
好消息是,我们知道问题出在哪里,我们可以通过实现我们自己的缩减函数来解决它,该函数尝试从原始函数 (A.f
) 和实例 obj 重新创建绑定(bind)方法对象(B()
):
import types
import copyreg
import multiprocessing
def my_reduce(obj):
return (obj.__func__.__get__, (obj.__self__,))
copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)
我们可以这样做,因为绑定(bind)方法是一个描述符。
ps:我已经备案a bug report .
关于python - 为什么 ProcessPoolExecutor 和 Pool 会因 super() 调用而崩溃?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56609847/
python ProcessPoolExecutor 在命令行中工作,但添加到函数后不运行 它是这样工作的 from concurrent import futures def multi_proce
如果父进程因任何原因终止,有没有办法使 concurrent.futures.ProcessPoolExecutor 中的进程终止? 一些细节:我在处理大量数据的作业中使用 ProcessPoolEx
这是我关于 stackoverflow 的第一个问题。我基本上能够在这里找到我需要知道的东西。顺便说一句,非常感谢。 但是。如果我尝试终止我的 ProcessPoolExecutor,它只会在生成的整
我有一个代码,它从 gstorage 下载文件,将它们转储到 json,然后将该 json 转为 csv,然后转为 parquet,最后上传到 aws s3(不要问为什么我不是写它的人)。 我从我的日
ESPNP 播放器免费 class ESPNPlayerFree: def __init__(self, player_id, match_id, match_id_team): ... 团队列表1:
在本文档 ( https://pymotw.com/3/concurrent.futures/ ) 中说: “ProcessPoolExecutor 的工作方式与 ThreadPoolExecutor
我正在创建一个多处理程序来处理多个批处理,但我的日志记录无法将批处理记录到日志文件中,只会记录根 log.info,如何设置日志记录以正确打印到日志文件? 日志只会打印这样一行"INFO:root:t
我正在尝试使用一个单独的进程通过并发 future 流式传输数据。然而,另一方面,有时对方会停止数据馈送。但只要我重新启动这个 threadable 然后它就会再次工作。所以我设计了这样的东西,以便能
设置 我设置了一个函数来接收多个关键字参数: def process(image, folder, param1, param2, param3): do_things return
from concurrent.futures import ProcessPoolExecutor import os import time def parInnerLoop(item):
python 3.6.6 这是代码: import asyncio import time from concurrent.futures import ProcessPoolExecutor exe
我是一般并行化的新手,特别是 concurrent.futures。我想对我的脚本进行基准测试并比较使用线程和进程之间的差异,但我发现我什至无法运行它,因为在使用 ProcessPoolExecuto
代码: if __name__ == "__main__": p = ProcessPoolExecutor() p.submit(lambda x: print(x), "somet
代码: if __name__ == "__main__": p = ProcessPoolExecutor() p.submit(lambda x: print(x), "somet
我想在多进程环境中记录到单个文件。我可以得到 sample code从 python 日志记录手册开始工作。但是当我替换为 ProcessPoolExecutor 时,它不起作用。 # work
我正在尝试使用新的 Tornado queue对象以及 concurrent.futures允许我的网络服务器将 CPU 密集型任务传递给其他进程。我想访问从 concurrent.futures 模
我有一个 python 函数正在调用我无法控制或更新的 C 库。不幸的是,C 库存在间歇性错误,有时会挂起。为了防止我的应用程序也挂起,我尝试隔离 ThreadPoolExecutor 或 Proce
我最近开始使用 Python 的多线程和多处理功能。 我尝试编写代码,使用生产者/消费者方法从 JSON 日志文件中读取 block ,将这些 block 作为事件写入队列,然后启动一组将从该队列中轮
我有一大堆必须以某种方式处理的元素。我知道可以通过以下方式使用多处理过程来完成: pr1 = Process(calculation_function, (args, )) pr1.start() p
if __name__ == '__main__': MATCH_ID = str(doc_ref2.id) MATCH_ID_TEAM = doc_ref3.id with
我是一名优秀的程序员,十分优秀!