gpt4 book ai didi

dask - 可以使用无休止的流式输入进行工作

转载 作者:行者123 更新时间:2023-12-01 08:01:08 27 4
gpt4 key购买 nike

我知道 dask 在这样的批处理模式下运行良好

def load(filename):
...

def clean(data):
...

def analyze(sequence_of_data):
...

def store(result):
with open(..., 'w') as f:
f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
  • 我们可以使用 dask 来处理流 channel ,其中 block 的数量是未知的甚至是无穷无尽的吗?
  • 它可以以增量方式执行计算吗?例如,上面的“分析”步骤可以处理正在进行的 block 吗?
  • 我们必须在所有数据 block 都知道之后才调用“get”操作,我们是否可以在调用“get”之后添加新 block
  • 最佳答案

    编辑:请参阅下面的更新答案

    dask 中的当前任务调度程序需要一个计算图。它不支持从该图中动态添加或删除。调度器旨在评估少量内存中的大图;提前了解整个图表对此至关重要。
    但是,这并不能阻止创建具有不同属性的其他调度程序。一个简单的解决方案就是使用像 conncurrent.futures 这样的模块。在单台机器上或 distributed 在多台机器上。
    其实,是
    分布式调度程序现在完全异步运行,您可以在计算期间提交任务、等待其中一些、提交更多、取消任务、添加/删除工作人员等。有几种方法可以做到这一点,但最简单的可能是新的concurrent.futures。这里简要描述的界面:
    http://dask.pydata.org/en/latest/futures.html

    关于dask - 可以使用无休止的流式输入进行工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33952313/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com