gpt4 book ai didi

python - 存储结果 ThreadPoolExecutor

转载 作者:行者123 更新时间:2023-12-02 11:33:38 27 4
gpt4 key购买 nike

我对“concurrent.futures”的并行处理相当陌生,我正在测试一些简单的实验。我编写的代码似乎可以工作,但我不确定如何存储结果。我尝试创建一个列表(“ future ”)并将结果附加到该列表中,但这大大减慢了过程。我想知道是否有更好的方法来做到这一点。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
for j in range(100):
dtab[i,j]=i+j/2
couple_ods.append((i,j))

avg_speed=100
def task(i):
origin=i[0]
destination=i[1]
time.sleep(0.01)
distance=dtab[origin,destination]/avg_speed
return distance
start1=time.time()
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future.result())

if __name__ == '__main__':
main()
end1=time.time()

最佳答案

当您调用 future.result() 时,它将阻塞,直到值准备好为止。因此,您不会从并行性中获得任何好处 - 您启动一项任务,等待它完成,启动另一项任务,等待它完成,等等。

当然,您的示例首先不会从线程中受益。您的任务除了 CPU 限制的 Python 计算之外什么也不做,这意味着(至少在 CPython、MicroPython 和 PyPy 中,它们是 concurrent.futures 附带的唯一完整实现),GIL (全局解释器锁)将阻止多个线程同时进行。

希望您的真实程序有所不同。如果它正在执行 I/O 绑定(bind)的操作(发出网络请求、读取文件等),或者使用像 NumPy 这样的扩展库来释放 GIL 来解决繁重的 CPU 工作,那么它会工作得很好。但除此之外,您需要在此处使用 ProcessPoolExecutor

<小时/>

无论如何,您想要做的是将 future 本身附加到一个列表中,这样您就可以在等待其中任何一个之前获得所有 future 的列表:

for number in couple_ods:
future=executor.submit(task,number)
futures.append(future)

然后,在开始所有作业后,您可以开始等待它们。共有三个简单的选项,当您需要更多控制时,还有一个复杂的选项。

<小时/>

(1) 您可以直接循环它们以按照提交的顺序等待它们:

for future in futures:
result = future.result()
dostuff(result)

(2) 如果您需要等待它们全部完成后再进行任何工作,您可以调用wait:

futures, _ = concurrent.futures.wait(futures)
for future in futures:
result = future.result()
dostuff(result)

(3) 如果您想在每个准备就绪后立即对其进行处理,即使它们不按顺序排列,请使用 as_completed:

for future in concurrent.futures.as_completed(futures): 
dostuff(future.result())

请注意,文档中使用此函数的示例提供了某种方法来识别哪个任务已完成。如果您需要,它可以很简单,只需传递每个索引,然后返回索引,real_result,然后您可以for index, result in ...进行循环.

(4) 如果您需要更多控制,您可以对到目前为止所做的任何事情进行 wait 循环:

while futures:
done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
for future in done:
result = future.result()
dostuff(result)

该示例与 as_completed 执行相同的操作,但您可以在其上编写较小的变体来执行不同的操作,例如等待所有操作完成,但如果出现异常则提前取消。

<小时/>

对于许多简单的情况,您可以仅使用执行器的 map 方法来简化第一个选项。这就像内置的 map 函数一样,为参数中的每个值调用一次函数,然后为您提供一些可以循环以按相同顺序获取结果的内容,但它是并行执行的。所以:

for result in executor.map(task, couple_ods):
dostuff(result)

关于python - 存储结果 ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52082665/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com