- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用。
代码如下:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
我希望上面的代码只允许阻塞函数输出:
blocking 0/5
blocking 1/5
然后查看非阻塞函数的输出。但是,即使在我取消后,阻塞的 future 仍在继续。
这可能吗?还有其他方法吗?
谢谢
编辑:更多关于使用 asyncio 运行阻塞和非阻塞代码的讨论:How to interface blocking and non-blocking code with asyncio
最佳答案
在这种情况下,一旦 Future
实际开始运行,就无法取消它,因为您依赖于 concurrent.futures.Future
的行为, 和 its docs state the following :
cancel()
Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return
False
, otherwise the call will be cancelled and the method will returnTrue
.
因此,取消成功的唯一情况是任务仍在 Executor
中挂起。现在,您实际上使用的是 asyncio.Future
包裹在 concurrent.futures.Future
中,实际上 asyncio.Future
返回loop.run_in_executor()
将引发一个 CancellationError
如果您在调用 cancel()
后尝试yield from
,即使底层任务实际上已经在运行。但是,它实际上不会取消 Executor
中任务的执行。
如果您确实需要取消任务,则需要使用更传统的方法来中断线程中运行的任务。你如何做的细节取决于用例。对于示例中提供的用例,您可以使用 threading.Event
:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
关于python - 异步 : Is it possible to cancel a future been run by an Executor?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37792204/
我是一名优秀的程序员,十分优秀!