gpt4 book ai didi

Python Asyncio 未使用 asyncio.run_coroutine_threadsafe 运行新协程

转载 作者:行者123 更新时间:2023-12-01 00:14:01 43 4
gpt4 key购买 nike

1>Python Asyncio 未使用 asyncio.run_coroutine_threadsafe 运行新的协程。下面是在Mac上进行的代码测试。
————————————————————————————————

import os    
import random
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor

os.environ['PYTHONASYNCIODEBUG'] = '1'
logging.basicConfig(level=logging.WARNING)

async def coro():
print("Coroutine {} is has started")


async def main(loop):
print(" 1 ")
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(f"Future --")
print(" 2 ")
print(" Result ",fut.result())
print(" 3 ")

if __name__== '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main(loop))

—————————————————————————————————
输出:

 1       
Future -- <Future at 0x102a05358 state=pending>
2

================================================== ===============================

2>当我在不同的执行器上运行时,输出是相同的,如下所示
——————————————————————————————————————

new_loop = asyncio.new_event_loop()      
new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))
fut = asyncio.run_coroutine_threadsafe(coro(), new_loop)

———————————————————————————————————————
示例代码:

import os      
import random
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor

os.environ['PYTHONASYNCIODEBUG'] = '1'
logging.basicConfig(level=logging.WARNING)

async def coro():
print("Coroutine {} is has started")


async def main(loop):
print(" 1 ")
new_loop = asyncio.new_event_loop()
new_loop.set_default_executor(ThreadPoolExecutor(max_workers=2))
fut = asyncio.run_coroutine_threadsafe(coro(), new_loop)
print(f"Future --")
print(" 2 ")
print(" Result ",fut.result())
print(" 3 ")

if __name__== '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main(loop))

—————————————————————————————————
输出:

 1
Future -- <Future at 0x102f5d668 state=pending>
2

最佳答案

来自asyncio.run_coroutine_threadsafe :

This function is meant to be called from a different OS thread than the one where the event loop is running

async def coro():
print("Coroutine {} is has started")

def start_coro(loop):
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(fut.result())

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True)
Thread(target=start_coro, args=(loop,)).start()
loop.run_forever()

关于Python Asyncio 未使用 asyncio.run_coroutine_threadsafe 运行新协程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59443750/

43 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com