gpt4 book ai didi

python - Dask分布式异常处理方式

转载 作者:太空狗 更新时间:2023-10-29 21:10:41 25 4
gpt4 key购买 nike

我在使用 DaskDistributed 开发数据分析管道方面取得了很大的成功。然而,我仍然期待改进的一件事是我处理异常的方式。

现在如果,我写下以下内容

def my_function (value):
return 1 / value

results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))

print(results.compute())

... 然后在运行程序时我得到一长串回溯(我猜是每个 worker 一个)。最相关的分割是

distributed.utils - ERROR - division by zero
Traceback (most recent call last):
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get
result = yield self._gather(packed)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather
st.traceback)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify
File "test.py", line 9, in my_function
return 1 / value
ZeroDivisionError: division by zero

当然,在这里,目视检查会告诉我错误是将数字除以零。我想知道是否有更好的方法来跟踪这些错误。例如,我似乎无法捕获异常本身:

import dask.bag
import distributed

try:
dask_scheduler = "127.0.0.1:8786"
dask_client = distributed.Client(dask_scheduler)

def my_function (value):
return 1 / value

results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))

#dask_client.persist(results)

print(results.compute())

except Exception as e:
print("error: %s" % e)

编辑:请注意,在我的示例中,我使用的是分布式,而不仅仅是dask。有一个 dask-scheduler 在端口 8786 上监听并注册了四个 dask-worker 进程。

此代码将产生与上面完全相同的输出,这意味着我实际上并没有用我的 try/except block 捕获异常。

现在,由于我们讨论的是跨集群的分布式任务,因此将异常传播回给我显然很重要。是否有任何指南可以这样做?现在我的解决方案是让函数返回结果和可选的错误消息,然后分别处理结果和错误消息:

def my_function (value):
try:
return {"result": 1 / value, "error": None}
except ZeroDivisionError:
return {"result": None, "error": "boom!"}

results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))

dask_client.persist(results)

errors = (results
.pluck("error")
.filter(lambda x: x is not None)
.compute())

print(errors)

results = (results
.pluck("result")
.filter(lambda x: x is not None)
.compute())

print(results)

这行得通,但我想知道我是不是 sandblasting the soup cracker这里。 编辑:另一种选择是使用类似Maybe monad 的东西,但我再次想知道我是否想多了。

最佳答案

Dask 自动打包远程发生的异常并在本地重新引发它们。这是我运行您的示例时得到的结果

In [1]: from dask.distributed import Client

In [2]: client = Client('localhost:8786')

In [3]: import dask.bag

In [4]: try:
...: def my_function (value):
...: return 1 / value
...:
...: results = (dask.bag
...: .from_sequence(range(-10, 10))
...: .map(my_function))
...:
...: print(results.compute())
...:
...: except Exception as e:
...: import pdb; pdb.set_trace()
...: print("error: %s" % e)
...:
distributed.utils - ERROR - division by zero
> <ipython-input-4-17aa5fbfb732>(13)<module>()
-> print("error: %s" % e)
(Pdb) pp e
ZeroDivisionError('division by zero',)

关于python - Dask分布式异常处理方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42519966/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com