gpt4 book ai didi

DASK - 在执行期间停止工作人员会导致完成的任务启动两次

转载 作者:行者123 更新时间:2023-12-05 01:37:58 24 4
gpt4 key购买 nike

我想使用 dask 处理大约 5000 个批处理任务,这些任务将它们的结果存储在关系数据库中,在它们全部完成后,我想运行一个最终任务来查询数据库并生成一个结果文件(这将是存储在 AWS S3)

所以它或多或少是这样的:


来自 dask 进口袋,延迟
批处理 = bag.from_sequence(my_batches())
结果 = batches.map(process_batch_and_store_results_in_database)
图 = 延迟(read_database_and_store_bundled_result_into_s3)(结果)
client = Client('the_scheduler:8786')
client.compute(图)

这行得通,但是:在处理接近尾声时,许多工作人员处于空闲状态,我希望能够将它们关闭(并在 AWS EC2 上节省一些钱),但如果我这样做,调度程序将“忘记”那些任务已经完成,并尝试在剩余的工作人员上再次运行它们。

我知道这实际上是一个功能,而不是错误,因为 Dask 试图在开始 read_database_and_store_bundled_result_into_s3 之前跟踪所有结果,但是:有什么方法可以告诉 dask只需编排分布式处理图而不用担心状态管理?

最佳答案

我建议您在 Futures 完成后将其忘掉。此解决方案使用 dask.distributed concurrent.futures 接口(interface)而不是 dask.bag。特别是它使用 as_completed迭代器。

from dask.distributed import Client, as_completed
client = Client('the_scheduler:8786')

futures = client.map(process_batch_and_store_results_in_database, my_batches())

seq = as_completed(futures)
del futures # now only reference to the futures is within seq

for future in seq:
pass # let future be garbage collected

关于DASK - 在执行期间停止工作人员会导致完成的任务启动两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44072816/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com