gpt4 book ai didi

python - Streamz/Dask : gather does not wait for all results of buffer

转载 作者:行者123 更新时间:2023-12-02 09:01:48 24 4
gpt4 key购买 nike

进口:

from dask.distributed import Client
import streamz
import time

模拟工作负载:

def increment(x):
time.sleep(0.5)
return x + 1

假设我想在本地 Dask 客户端上处理一些工作负载:

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).gather().sink(print)

for i in range(10):
ps.emit(i)

这按预期工作,但是 sink(print) 当然会强制等待每个结果,因此流不会并行执行。

但是,如果我使用 buffer() 允许缓存结果,则 gather() 似乎不再正确收集所有结果,并且解释器会在之前退出得到结果。这种方法:

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)
# ^
for i in range(10): # - allow parallel execution
ps.emit(i) # - before gather()

...不为我打印任何结果。 Python 解释器在启动脚本后不久就会退出,并且 before buffer() 发出其结果,因此什么也没有被打印。

但是,如果主进程被迫等待一段时间,结果将以并行方式打印(因此它们不会互相等待,而是几乎同时打印):

if __name__ == "__main__":
with Client() as dask_client:
ps = streamz.Stream()
ps.scatter().map(increment).buffer(10).gather().sink(print)

for i in range(10):
ps.emit(i)

time.sleep(10) # <- force main process to wait while ps is working

这是为什么呢?我认为 gather() 应该等待一批 10 个结果,因为 buffer() 应该并行缓存正好 10 个结果,然后再将它们刷新到 gather().为什么在这种情况下 gather() 不会阻塞?

是否有一种很好的方法来检查 Stream 是否仍包含正在处理的元素以防止主进程过早退出?

最佳答案

  1. “这是为什么?”:因为 Dask 分布式调度程序(执行流映射器和接收器函数)和 Python 脚本在不同的进程中运行。当“with” block 上下文结束时,您的 Dask 客户端将关闭,并且在发送到流的项目能够到达接收器函数之前关闭执行。

  2. “有没有一种好的方法来检查 Stream 是否仍然包含正在处理的元素”:我不知道。 但是:如果您想要的行为是(我只是在这里猜测)并行处理一堆项目,那么 Streamz 不是您应该使用的,普通 Dask 就足够了。

关于python - Streamz/Dask : gather does not wait for all results of buffer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60032474/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com