- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失败。该代码的目的是创建一个支持在 macOS 下调用 size()
的自定义队列对象,因此继承了 Queue
对象并获取多处理的上下文。
import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep
class Q(Queue):
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = 1
def call(self):
return print(self.size)
def foo(q):
q.call()
if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # this would fail
# multiprocessing.set_start_method('fork') # this would succeed
q = Q()
p = Process(target=foo, args=(q,))
p.start()
p.join(timeout=1)
使用“spawn”时输出的错误消息如下所示。
Process Process-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
q.call()
File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
return print(self.size)
AttributeError: 'Q' object has no attribute 'size'
看来子进程认为self.size
不是代码执行所必需的,所以没有复制。我的问题是为什么会发生这种情况?
在 macOS Catalina 10.15.6、Python 3.8.5 下测试的代码片段
最佳答案
问题是生成的进程没有共享资源,因此要为每个进程正确地重新创建队列实例,您需要添加序列化和反序列化方法。这是一个工作代码:
# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.
import multiprocessing
import multiprocessing.queues
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)
def __getstate__(self):
return (self.count,)
def __setstate__(self, state):
(self.count,) = state
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self._counter = SharedCounter(0)
def __getstate__(self):
return super().__getstate__() + (self._counter,)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self._counter = state[-1]
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self._counter.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self._counter.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self._counter.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
关于multiprocessing - 为什么在 Python3.8+ "fork"中使用 "spawn"有效但使用 `multiprocessing` 失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65098398/
有3个 repo : 有 OpenAI Baselines 存储库:https://github.com/openai/baselines . 我有它的 fork :https://github.co
我试图了解在调用 fork() 后复制文件描述符的含义及其对争用的可能影响。 在“Linux 编程接口(interface)”24.2.1 (p517) 中: When a fork() is per
我对 systemd 如何跟踪主进程存在后仍然存在的主进程的子进程感兴趣? 最佳答案 Systemd 为此使用了 cgroups。在启动任何使用服务文件定义的可执行文件之前,systemd 会创建一个
这是输出--- 家长:我的pid是4525 parent :我 parent 的 pid 是 3350 parent 开始- 4525 3350 fork 前 fork 前 child 4526 45
我之前 fork 了 jockm/vert.x 并向他发送了拉取请求。现在我想 fork vert-x/vert.x (jockm/vert.x 的上游)并向他们发送不同的拉取请求。但是,当我单击“F
我想控制从 Perl 脚本派生的进程的名称 a。理想情况下它会像这样: ./forker.pl | ... | | fork("forked.pl"); |\ | \ | `--------\ |
我知道 fork() 在更高级别上做什么。我想知道的是这个—— 一旦有 fork 调用,trap 指令就会跟随并且控制跳转以执行 fork “处理程序”。现在,这个创建子进程的处理程序如何通过创建另一
我正在研究操作系统测验,但我不知道输出什么 if(fork()) fork() 会产生。有人可以解释吗? 我不明白这一行: if(fork()) 编辑: 我所说的“输出”是指如果执行此代码,将
这个问题在这里已经有了答案: Why does this program print "forked!" 4 times? (6 个答案) 关闭 3 年前。 在 C 中,fork() 函数将为父进程
有什么方法可以区分程序中不同 fork() 函数创建的子进程。 global variable i; SIGCHLD handler function() { i--; } handle() {
我正在重新开发一个系统,该系统将通过 http 向多个供应商之一发送消息。原来是perl脚本,重新开发很可能也会用perl。 在旧系统中,同时运行多个 perl 脚本,每个供应商运行 5 个。当一条消
Git 的新手,仍然有点困惑。我在 github 上 fork 了一个项目,想将项目所有者最近对原始项目所做的一些更改引入/merge 到我的 fork 中。这可能吗?该项目是只读的,但基本上,我想让
根据维基百科(可能是错误的) When a fork() system call is issued, a copy of all the pages corresponding to the par
我需要帮助了解如何在 Go 中妖魔化进程。 package main import ( "fmt" "os" ) func start() { var procAttr os.Pro
我已经执行了这段代码。我知道消息的顺序是任意顺序的(因为我明确没有使用信号量)我的程序流程如何?为什么? 父级被执行,因此“baz”被打印一次。有人可以解释为什么不打印“bar”吗?为什么我得到“fo
这个问题已经有答案了: Why does this program print "forked!" 4 times? (6 个回答) 已关闭 5 年前。 我对 fork 进程有疑问。我有一个代码是 i
我在弄清楚如何使用在不同进程之间创建的列表时遇到了麻烦。我所拥有的是: FileList.h - 我创建的列表 #include "Node.h" typedef struct FileList {
好吧,所以我一直在 stackoverflow 上查找这个问题,并且肯定在谷歌上搜索了半个小时,但我得到的答案似乎与我真正想做的事情几乎没有任何关系,希望有人能提供帮助我在这方面,代码如下: int
我正在尝试了解 fork-join 的工作原理。 维基百科有以下合并排序示例,其中左半部分被 fork ,右半部分由当前线程处理。 mergesort(A, lo, hi): if lo t
下面的代码输出了一系列的过程,它到底做了什么: _PARENT_ / \ /
我是一名优秀的程序员,十分优秀!