gpt4 book ai didi

python - 异步 : Is it possible to cancel a future been run by an Executor?

转载 作者:太空狗 更新时间:2023-10-30 00:19:12 56 4
gpt4 key购买 nike

我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用。

代码如下:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)

print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)

print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我希望上面的代码只允许阻塞函数输出:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出。但是,即使在我取消后,阻塞的 future 仍在继续。

这可能吗?还有其他方法吗?

谢谢

编辑:更多关于使用 asyncio 运行阻塞和非阻塞代码的讨论:How to interface blocking and non-blocking code with asyncio

最佳答案

在这种情况下,一旦 Future 实际开始运行,就无法取消它,因为您依赖于 concurrent.futures.Future 的行为, 和 its docs state the following :

cancel()

Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

因此,取消成功的唯一情况是任务仍在 Executor 中挂起。现在,您实际上使用的是 asyncio.Future 包裹在 concurrent.futures.Future 中,实际上 asyncio.Future 返回loop.run_in_executor() 将引发一个 CancellationError 如果您在调用 cancel() 后尝试yield from ,即使底层任务实际上已经在运行。但是,它实际上不会取消 Executor 中任务的执行。

如果您确实需要取消任务,则需要使用更传统的方法来中断线程中运行的任务。你如何做的细节取决于用例。对于示例中提供的用例,您可以使用 threading.Event:

def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)

print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

关于python - 异步 : Is it possible to cancel a future been run by an Executor?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37792204/

56 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com