- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个接受列表的程序。对于此列表中的每个值,它都会检索另一个列表并处理另一个列表。
基本上,它是一个 3 深度的树,需要在每个节点进行可能昂贵的处理。
每个节点都需要能够处理其子节点的结果。
我想做的是map
来自第一层的输入list
到每个节点的结果。但在每个过程中,我想 map
下一层的结果。
我担心的是每一层都有自己的最大 worker 数量。如果可能的话,我希望他们共享一个进程池,否则所有进程切换都会影响性能。
有没有办法,使用 concurrency.futures
或者其他方法,让每一层共享相同的进程池?
一个例子是:
def main():
my_list = [1,2,3,4]
with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
results = executor.map(my_function, zip(my_list, [executor] * len(my_list)))
#process results
def my_function(args):
list = args[0]
executor = args[1]
new_list = process(list)
results = executor.map(second_function, new_list)
#process results
#return processed results
def second_function(values):
...
这样,每个子进程都会从同一个池中提取。
或者,我可以做类似的事情(但不完全是)
import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor
并每次调用 executor
从同一个进程池中提取?
最佳答案
问题是您的进程池有 4 个线程,并且您尝试等待大约 20 个线程..因此没有足够的线程来执行您想要的操作。
换句话说:my_function
在工作线程中执行。当调用 map 时,该线程会阻塞。缺少一个线程可用于执行对映射的调用。 future 阻塞了这个线程。
我的解决方案是使用返回 future 的 yield
和 yield from
语句。所以我的解决办法就是去掉futures和thread的阻塞。创建所有 future,然后发生yield 来中断当前执行并释放线程。然后该线程可以执行 map future。一旦 future 完成,注册的回调就会执行 next()
生成器步骤。
要解决现有对象的代理问题,必须首先解决此问题:How to properly set up multiprocessing proxy objects for objects that already exist
因此,我们要执行以下递归:[1,[2,[3,3,3],2],1],0,0]
列表。
我们可以期待以下输出:
tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0]
tasks: [1, [2, [3, 3, 3], 2], 1]
tasks: 0
tasks: 0
tasks: 1
tasks: [2, [3, 3, 3], 2]
tasks: 1
tasks: 2
tasks: [3, 3, 3]
tasks: 2
tasks: 3
tasks: 3
tasks: 3
v: 15
这里的代码引入了一个启用递归的ThreadPoolExecutor:
import traceback
from concurrent.futures.thread import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads
class RecursiveThreadPoolExecutor(ThreadPoolExecutor):
# updated version here: https://gist.github.com/niccokunzmann/9170072
def _submit(self, fn, *args, **kwargs):
return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
real_future = Future()
def generator_start():
try:
## print('start', fn, args, kwargs)
generator = fn(*args, **kwargs)
## print('generator:', generator)
def generator_next():
try:
## print('next')
try:
future = next(generator)
except StopIteration as stop:
real_future.set_result(stop.args[0])
else:
if future is None:
self._submit(generator_next)
else:
future.add_done_callback(lambda future: generator_next())
except:
traceback.print_exc()
self._submit(generator_next)
## print('next submitted 1')
except:
traceback.print_exc()
self._submit(generator_start)
return real_future
def recursive_map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
yield from fs
return fs
return result_iterator()
if __name__ == '__main__':
def f(args):
executor, tasks = args
print ('tasks:', tasks)
if type(tasks) == int:
return tasks
# waiting for all futures without blocking the thread
futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks])
return sum([future.result() for future in futures])
with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
import time
time.sleep(0.1)
for v in r:
print('v: {}'.format(v))
可以在此处找到更新版本:https://gist.github.com/niccokunzmann/9170072
遗憾的是,我现在无法使用一些多处理内容来为进程实现此功能。您可以做到这一点,唯一需要做的就是为 generator_start
和 generator_next
函数创建一个代理对象。如果您这样做,请告诉我。
为了解决方法的代理问题,这个问题也将得到回答:How to properly set up multiprocessing proxy objects for objects that already exist
关于python - 我可以在 Future 中使用 ProcessPoolExecutor 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21960514/
我在网上搜索但没有找到任何合适的文章解释如何使用 javascript 使用 WCF 服务,尤其是 WebScriptEndpoint。 任何人都可以对此给出任何指导吗? 谢谢 最佳答案 这是一篇关于
我正在编写一个将运行 Linux 命令的 C 程序,例如: cat/etc/passwd | grep 列表 |剪切-c 1-5 我没有任何结果 *这里 parent 等待第一个 child (chi
所以我正在尝试处理文件上传,然后将该文件作为二进制文件存储到数据库中。在我存储它之后,我尝试在给定的 URL 上提供文件。我似乎找不到适合这里的方法。我需要使用数据库,因为我使用 Google 应用引
我正在尝试制作一个宏,将下面的公式添加到单元格中,然后将其拖到整个列中并在 H 列中复制相同的公式 我想在 F 和 H 列中输入公式的数据 Range("F1").formula = "=IF(ISE
问题类似于this one ,但我想使用 OperatorPrecedenceParser 解析带有函数应用程序的表达式在 FParsec . 这是我的 AST: type Expression =
我想通过使用 sequelize 和 node.js 将这个查询更改为代码取决于在哪里 select COUNT(gender) as genderCount from customers where
我正在使用GNU bash,版本5.0.3(1)-发行版(x86_64-pc-linux-gnu),我想知道为什么简单的赋值语句会出现语法错误: #/bin/bash var1=/tmp
这里,为什么我的代码在 IE 中不起作用。我的代码适用于所有浏览器。没有问题。但是当我在 IE 上运行我的项目时,它发现错误。 而且我的 jquery 类和 insertadjacentHTMl 也不
我正在尝试更改标签的innerHTML。我无权访问该表单,因此无法编辑 HTML。标签具有的唯一标识符是“for”属性。 这是输入和标签的结构:
我有一个页面,我可以在其中返回用户帖子,可以使用一些 jquery 代码对这些帖子进行即时评论,在发布新评论后,我在帖子下插入新评论以及删除 按钮。问题是 Delete 按钮在新插入的元素上不起作用,
我有一个大约有 20 列的“管道分隔”文件。我只想使用 sha1sum 散列第一列,它是一个数字,如帐号,并按原样返回其余列。 使用 awk 或 sed 执行此操作的最佳方法是什么? Accounti
我需要将以下内容插入到我的表中...我的用户表有五列 id、用户名、密码、名称、条目。 (我还没有提交任何东西到条目中,我稍后会使用 php 来做)但由于某种原因我不断收到这个错误:#1054 - U
所以我试图有一个输入字段,我可以在其中输入任何字符,但然后将输入的值小写,删除任何非字母数字字符,留下“。”而不是空格。 例如,如果我输入: 地球的 70% 是水,-!*#$^^ & 30% 土地 输
我正在尝试做一些我认为非常简单的事情,但出于某种原因我没有得到想要的结果?我是 javascript 的新手,但对 java 有经验,所以我相信我没有使用某种正确的规则。 这是一个获取输入值、检查选择
我想使用 angularjs 从 mysql 数据库加载数据。 这就是应用程序的工作原理;用户登录,他们的用户名存储在 cookie 中。该用户名显示在主页上 我想获取这个值并通过 angularjs
我正在使用 autoLayout,我想在 UITableViewCell 上放置一个 UIlabel,它应该始终位于单元格的右侧和右侧的中心。 这就是我想要实现的目标 所以在这里你可以看到我正在谈论的
我需要与 MySql 等效的 elasticsearch 查询。我的 sql 查询: SELECT DISTINCT t.product_id AS id FROM tbl_sup_price t
我正在实现代码以使用 JSON。 func setup() { if let flickrURL = NSURL(string: "https://api.flickr.com/
我尝试使用for循环声明变量,然后测试cols和rols是否相同。如果是,它将运行递归函数。但是,我在 javascript 中执行 do 时遇到问题。有人可以帮忙吗? 现在,在比较 col.1 和
我举了一个我正在处理的问题的简短示例。 HTML代码: 1 2 3 CSS 代码: .BB a:hover{ color: #000; } .BB > li:after {
我是一名优秀的程序员,十分优秀!