gpt4 book ai didi

multithreading - Dask:如何在 dask 延迟的情况下并行化我的代码?

转载 作者:行者123 更新时间:2023-12-03 11:51:35 25 4
gpt4 key购买 nike

这是我第一次尝试并行处理,我一直在研究 Dask,但我在实际编码时遇到了麻烦。

我看过他们的示例和文档,我认为 dask.delayed 效果最好。我试图用 delay(function_name) 包装我的函数,或者添加一个 @delayed 装饰器,但我似乎无法让它正常工作。我更喜欢 Dask 而不是其他方法,因为它是用 python 制作的,并且(假设)简单。我知道 dask 在 for 循环中不起作用,但他们说它可以在循环内工作。

我的代码通过一个包含其他函数输入的函数传递文件,如下所示:

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....

然后做一些预处理例如:
    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)

然后我调用一个构造函数并将 pre_results 传递给函数调用:
    fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)

我在这里做的是将文件传递到 for 循环中,进行一些预处理,然后将文件传递到两个模型中。

关于如何并行化的想法或提示?我开始遇到奇怪的错误,我不知道如何修复代码。代码按原样工作。我使用了一堆 Pandas 数据帧、系列和 numpy 数组,我不想回去改变一切以使用 dask.dataframes 等。

我评论中的代码可能难以阅读。这是一种更格式化的方式。

在下面的代码中,当我输入 print(mean_squared_error) 我只是得到: Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)

最佳答案

您需要调用 dask.compute 以最终计算结果。见 dask.delayed documentation .

顺序码

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)

并行代码
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)

关于multithreading - Dask:如何在 dask 延迟的情况下并行化我的代码?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42550529/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com