gpt4 book ai didi

Python 3 : Multiprocessing API calls with exit condition

转载 作者:太空宇宙 更新时间:2023-11-03 17:47:01 24 4
gpt4 key购买 nike

我正在尝试编写一个应用程序,它可以处理一系列数据库条目,使用这些条目进行 API 调用,返回值,并且如果 API JSON 响应的一个值为 True 则为 5电话,我想要这 5 个电话的列表。由于数据库条目有几千个条目,我想通过多处理来实现这一点。但我是并行化的初学者,似乎我无法掌握它是如何工作的以及如何设置退出条件。这是我得到的:

from multiprocessing.dummy import Pool
import requests

def get_api_response(apikey, result, subscription_id):
r = requests.get("https://api.example.com/" + subscription_id)
if r.json()['subscribed'] == True:
result.append(r.json())
return result

def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo

def check_response_amount(result):
if len(result) >= 5:
pool.terminate()

# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(5)
pool_result = pool.map_async(pass_args, request_tuples, callback=check_response_amount)
pool_result.wait()
pool.close()
pool.join()

应用程序检查每个数据库条目并返回每个具有 subscribed == True 的 api 响应,甚至无需运行回调。我尝试应用另一个问题( Python Multiprocessing help exit on condition )的答案,但无法让它工作。有人可以帮助我吗?

最佳答案

当您使用map_async时,回调将在迭代中的每个工作项完成后才会执行。如果您希望对 request_tuples 中的每个项目执行回调,而不是仅在所有项目完成后执行,则需要在 for 循环内使用 apply_async :

results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, args=item, callback=check_response_amount))

for result in results:
result.wait()

此外,调用pool.terminate不会按照您想要的方式工作;一旦您调用它,您已经提交到池中的项目将永远挂起,这将使您的脚本挂起,因为您在退出之前等待它们完成。您可以通过等待池加入来解决此问题,而不是实际等待任何单个任务完成。

import time
from multiprocessing.dummy import Pool
from multiprocessing.pool import TERMINATE

def get_api_response(apikey, result, subscription_id):
url = ("https://api.example.com/" + str(subscription_id))
time.sleep(2)
result.append(url)
return result

def pass_args(args):
foo = get_api_response(*args)
if foo:
return foo

def check_response_amount(result):
if result and len(result) >= 5:
print("DONE %s" % result)
pool.terminate()


def get_db_entries():
return [{'subscription_id' : i} for i in range(100)]

# One entry looks like that: {"id": 1, "name": "smith", "subscription_id": 123}
db_entries = get_db_entries()
apikey = 'abcd1234'
result = []
request_tuples = [(apikey, result, entry['subscription_id']) for entry in db_entries]
pool = Pool(2)
results = []
for item in request_tuples:
results.append(pool.apply_async(get_api_response, item, callback=check_response_amount))
pool.close()
pool.join()
print("done")

输出:

IN HERE
IN HERE
IN HERE
IN HERE
IN HERE
... (a bunch more of this)...
IN HERE
IN HERE
DONE ['https://api.example.com/1', 'https://api.example.com/0', 'https://api.example.com/2', 'https://api.example.com/3', 'https://api.example.com/4', 'https://api.example.com/5']
done

请注意,结果列表最终可能会比您想要的稍大一些,因为terminate调用实际上不会停止正在进行的任务。

关于Python 3 : Multiprocessing API calls with exit condition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29633490/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com