- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想使用 multiprocessing.Value
+ multiprocessing.Lock
在不同的进程之间共享一个计数器。例如:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
这将抛出以下异常:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
我最困惑的是一个相关的(尽管不完全相似)模式与 multiprocessing.Process()
一起工作:
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
现在,我认识到这是两个截然不同的事情:
cpu_count()
的工作进程,并在它们之间拆分一个可迭代的 range(25)
就是说:如何以这种方式与 pool.starmap()
(或 pool.map()
)共享实例?
我看过类似的问题here , here , 和 here ,但这些方法似乎并不适合 .map()
/.starmap()
,不管 Value
是否使用 ctypes.c_int
.
我意识到这种方法在技术上是可行的:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
这真的是解决此问题的最佳实践方法吗?
最佳答案
使用 Pool
时出现的 RuntimeError
是因为池方法的参数在通过(池内部)队列发送到工作进程之前被腌制。您尝试使用哪种池方法在这里无关紧要。当您只使用 Process
时不会发生这种情况,因为不涉及队列。您可以使用 pickle.dumps(multiprocessing.Value('i', 0))
重现错误。
您的最后一个代码片段并不像您认为的那样工作。您不是共享 Value
,而是为每个子进程重新创建独立的计数器。
如果您使用的是 Unix 并使用默认的启动方法“fork”,您只需不将共享对象作为参数传递到池方法中即可。您的子进程将通过 fork 继承全局变量。使用进程启动方法“spawn”(默认 Windows 和 macOS with Python 3.8+)或“forkserver”,您必须在 Pool
期间使用 initializer
实例化,让子进程继承共享对象。
请注意,您在这里不需要额外的 multiprocessing.Lock
,因为 multiprocessing.Value
默认带有一个您可以使用的内部锁。
import os
from multiprocessing import Pool, Value #, set_start_method
def func(x):
for i in range(x):
assert i == i
with cnt.get_lock():
cnt.value += 1
print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')
def init_globals(counter):
global cnt
cnt = counter
if __name__ == '__main__':
# set_start_method('spawn')
cnt = Value('i', 0)
iterable = [10000 for _ in range(10)]
with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
pool.map(func, iterable)
assert cnt.value == 100000
可能还值得注意的是,您不需要在所有情况下都共享 计数器。如果您只需要跟踪某件事发生的频率,一个选择是在计算期间保留单独的工作人员本地计数器,您在最后总结。对于在并行计算本身期间不需要同步的频繁计数器更新,这可能会显着提高性能。
关于python - 与 multiprocessing.Pool 共享一个计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53617425/
我正在使用 boost.pool,但我不知道何时使用 boost::pool<>::malloc和 boost::pool<>::ordered_malloc ? 所以, boost::pool<>:
我目前正在尝试从 anaconda 中的 spy 控制台运行并行代码。我相信问题可能出在我的计算机不允许 anaconda 控制 CPU 核心上,但我不知道如何解决这个问题。 另一个有趣的点是,当我运
在了解 Python 的 multiprocessing 包(对于 Python 3.4 )时,我注意到 multiprocessing.Pool 是在类 BaseContext 中定义的 上下文.p
我有这样的程序: from multiprocessing import Pool import time def f(x): # I make a heavy code here to take t
我有一个模块 A,它通过获取数据并将其发送到模块 B、C、D 等进行分析,然后将它们的结果结合在一起来执行基本的 map/reduce。 但是模块 B、C、D 等似乎不能自己创建多处理池,否则我得到
所以我有一个脚本可以连接到大约 700 个设备并执行一系列命令,然后退出。我开始使用 Multiprocessing.Pool 和 Pool.map 来减少脚本的运行时间,并允许我同时登录多个设备。
在下面的链接中有对 Pool 类的 map 方法的解释。 它似乎阻塞直到结果准备好。这意味着不需要执行 pool.close(); pool.join() 在运行 pool.map 之后,但是它在 t
context 是 class multiprocessing.pool.Pool 构造函数中的可选参数。 Documentation只说: context can be used to specif
这个问题在这里已经有了答案: 关闭 11 年前。 Possible Duplicate: What's the difference between sending -release or -dra
不确定这是否是正确的论坛。 libvirt 页面链接在这里。如果这需要张贴在不同的地方请告诉我。 virsh pool-define-as 和 create-as 有什么区别?阅读 virsh 的手册
谁能告诉我Spring Cloud Feign Client是否提供或支持Http连接池,如果可以,那么如何配置诸如池大小的设置?我似乎在官方文档中找不到此内容。谢谢你。 最佳答案 通过调查,我将尝试
我在尝试运行 Flask 应用程序时遇到了一些困难。我收到以下导入错误: File "/db/mysql_utils.py", line 2, in import mysql.conne
我有一个 Node 项目,在其中使用 pg-pool 库。我已在我的依赖项中包含以下内容: "@types/pg-pool": "0.0.3", "pg": "^7.3.0", "pg-format"
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
这个问题在这里已经有了答案: Concurrent.futures vs Multiprocessing in Python 3 (6 个答案) 关闭 5 年前。 请给我解释一下这两个类有什么区别?
multiprocessing 的文档states以下关于Pool.join() : Wait for the worker processes to exit. One must call clos
我找到了一些资源,但我不确定我是否理解。 我找到的一些资源是: http://help.sap.com/saphelp_nw70/helpdata/en/fc/eb2ff3358411d1829f00
我的 Javafx 应用程序抛出许多非法状态异常,我尚未能够在源中跟踪触发器。 任何人都可以指导我导致此问题的原因以及我应该在哪里查找原因。我很难在这里展示一些代码,因为我不知道是什么原因造成的。 任
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我目前有一个连接到我的主数据库的开放池,它运行良好。但是现在,我想为另一个数据库打开一个新池。我完全按照设置第一个池的方式设置了新池,显然我编辑了数据库名称等。加载 setupHikari() 方法时
我是一名优秀的程序员,十分优秀!