- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试编写一个应用程序,它可以处理一系列数据库条目,使用这些条目进行 API 调用,返回值,并且如果 API JSON 响应的一个值为 True
则为 5电话,我想要这 5 个电话的列表。由于数据库条目有几千个条目,我想通过多处理
来实现这一点。但我是并行化的初学者,似乎我无法掌握它是如何工作的以及如何设置退出条件。这是我得到的:
from multiprocessing.dummy import Pool
import requests
def get_api_response(apikey, result, subscription_id):
r = requests.get("https://api.example.com/" + subscription_id)
if r.json()['subscribed'] == True:
result.append(r.json())
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if len(result) >= 5:
pool.terminate()
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()
应用程序检查每个数据库条目并返回每个具有 subscribed == True
的 api 响应,甚至无需运行回调。我尝试应用另一个问题( Python Multiprocessing help exit on condition )的答案,但无法让它工作。有人可以帮助我吗?
最佳答案
当您使用map_async
时,回调将在迭代中的每个工作项完成后才会执行。如果您希望对 request_tuples
中的每个项目执行回调,而不是仅在所有项目完成后执行,则需要在 for 循环内使用 apply_async
:
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))
for result in results:
result.wait()
此外,调用pool.terminate
不会按照您想要的方式工作;一旦您调用它,您已经提交到池中的项目将永远挂起,这将使您的脚本挂起,因为您在退出之前等待它们完成。您可以通过等待池加入来解决此问题,而不是实际等待任何单个任务完成。
import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE
def get_api_response(apikey, result, subscription_id):
url = ("https://api.example.com/" + str(subscription_id))
time.sleep(2)
result.append(url)
return result
def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo
def check_response_amount(result):
if result and len(result) >= 5:
print("DONE %s" % result)
pool.terminate()
def get_db_entries():
return [{'subscription_id' : i} for i in range(100)]
# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")
输出:
IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done
请注意,结果
列表最终可能会比您想要的稍大一些,因为terminate
调用实际上不会停止正在进行的任务。
关于Python 3 : Multiprocessing API calls with exit condition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29633490/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!