gpt4 book ai didi

python - 带有需要 dask 计算关键字参数的函数的自定义 dask 图

转载 作者:行者123 更新时间:2023-12-03 21:23:10 24 4
gpt4 key购买 nike

如何使用需要作为另一个 dask 任务结果的关键字参数的函数来构建自定义 dask 图?

dask 文档和几个stackoverflow 问题建议使用partial , toolz , 或 dask.compatibility.apply .所有这些解决方案都适用于静态关键字参数。我的理解来自 Including keyword arguments (kwargs) in custom Dask graphs对源代码和调试器的一些阅读是 dask.compatibility.apply可能能够使用作为 dask 计算结果的关键字参数。但是,我似乎无法获得正确的语法,也无法在其他地方找到答案。

下面的例子展示了一个相对简单的应用 dask.compatibility.apply使用 dask 计算的关键字值。 Dask 成功传递了计算参数的值 'a''b' ,以及静态关键字值 'other' .但是,它传递了字符串 'c'到函数,而不是用它的计算值替换它。

import dask
from dask.compatibility import apply


def custom_func(a, b, other=None, c=None):
print(a, b, other, c)
return a * b / c / other


dsk = {
'a': (sum, (1, 1)),
'b': (sum, (2, 2)),
'c': (sum, (3, 3)),
'd': (apply, custom_func, ['a', 'b'], {'c': 'c', 'other': 2})
}

dask.visualize(dsk, filename='graph.png')
for key in sorted(dsk):
print(key)
print(dask.get(dsk, key))
print('\n')

输出如下:
a
2


b
4


c
6


d
2 4 2 c
Traceback (most recent call last):
File "dask_kwarg.py", line 20, in <module>
print(dask.get(dsk, key))
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 562, in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 529, in get_async
fire_task()
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 504, in fire_task
callback=queue.put)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 551, in apply_sync
res = func(*args, **kwds)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 295, in execute_task
result = pack_exception(e, dumps)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 290, in execute_task
result = _execute_task(task, data)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/local.py", line 271, in _execute_task
return func(*args2)
File "/Users/holmgren/miniconda3/envs/pvlib36/lib/python3.6/site-packages/dask/compatibility.py", line 50, in apply
return func(*args, **kwargs)
File "dask_kwarg.py", line 7, in custom_func
return a * b / c / other
TypeError: unsupported operand type(s) for /: 'int' and 'str'

graph.png

最佳答案

一种方法是找出 dask.delayed 如何做到这一点:)

In [1]: import dask

In [2]: @dask.delayed
...: def f(*args, **kwargs):
...: pass
...:

In [3]: dict(f(x=1).dask)
Out[3]:
{'f-d2cd50e7-25b1-49c5-b463-f05198b09dfb': (<function dask.compatibility.apply>,
<function __main__.f>,
[],
(dict, [['x', 1]]))}

有趣的是,这也是本地调度器和分布式调度器不一致的情况。分布式调度程序可以很好地处理这个问题。
In [1]: from dask.distributed import Client

In [2]: client = Client()

In [3]: import dask
...: from dask.compatibility import apply
...:
...:
...: def custom_func(a, b, other=None, c=None):
...: print(a, b, other, c)
...: return a * b / c / other
...:
...:
...: dsk = {
...: 'a': (sum, (1, 1)),
...: 'b': (sum, (2, 2)),
...: 'c': (sum, (3, 3)),
...: 'd': (apply, custom_func, ['a', 'b'], {'c': 'c', 'other': 2})
...: }
...:

In [4]: for key in sorted(dsk):
...: print(key, client.get(dsk, key))
...:
a 2
b 4
c 6
2 4 2 6
d 0.6666666666666666

关于python - 带有需要 dask 计算关键字参数的函数的自定义 dask 图,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51178430/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com