gpt4 book ai didi

python - 如何从init运行异步协程,等待完成

转载 作者:行者123 更新时间:2023-12-03 06:45:11 25 4
gpt4 key购买 nike

我正在从__init__连接到aioredis(我不想将其移出,因为这意味着我必须进行一些额外的重大更改)。我如何等待下面的task示例代码中的aioredis连接__init__并打印self.subself.pub对象?目前它给出一个错误说

abc.py:42> exception=AttributeError("'S' object has no attribute 'pub'")



我确实看到创建了redis连接并且完成了coro create_connetion

有没有一种方法可以阻止来自 __init__的异步调用。如果我将 asyncio.wait替换为 asyncio.run_until_complete,则会收到一个粗略的错误提示

loop is already running.


asyncio.gather
import sys, json
from addict import Dict
import asyncio
import aioredis

class S():
def __init__(self, opts):
print(asyncio.Task.all_tasks())
task = asyncio.wait(asyncio.create_task(self.create_connection()), return_when="ALL_COMPLETED")
print(asyncio.Task.all_tasks())
print(task)
print(self.pub, self.sub)

async def receive_message(self, channel):
while await channel.wait_message():
message = await channel.get_json()
await asyncio.create_task(self.callback_loop(Dict(json.loads(message))))

async def run_s(self):
asyncio.create_task(self.listen())
async def callback_loop(msg):
print(msg)

self.callback_loop = callback_loop

async def create_connection(self):
self.pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
self.sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
self.db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
self.listener = await self.sub.subscribe(f"abc")

async def listen(self):
self.tsk = asyncio.ensure_future(self.receive_message(self.listener[0]))
await self.tsk

async def periodic(): #test function to show current tasks
number = 5
while True:
await asyncio.sleep(number)
print(asyncio.Task.all_tasks())

async def main(opts):
loop.create_task(periodic())
s = S(opts)
print(s.pub, s.sub)
loop.create_task(s.run_s())

if __name__ == "__main__":
loop = asyncio.get_event_loop()
main_task = loop.create_task(main(sys.argv[1:]))
loop.run_forever() #I DONT WANT TO MOVE THIS UNLESS IT IS NECESSARY

最佳答案

我认为您想要做的是确保函数create_connections在S构造函数之前运行完成。一种方法是重新整理代码。将create_connections函数移到类之外:

async def create_connection():
pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
listener = await self.sub.subscribe(f"abc")
return pub, sub, db, listener

现在,在构造S之前等待该函数。因此您的主要函数变为:
async def main(opts):
loop.create_task(periodic())
x = await create_connections()
s = S(opts, x) # pass the result of create_connections to S
print(s.pub, s.sub)
loop.create_task(s.run_s())

现在修改S构造函数以接收创建的对象:
def __init__(self, opts, x):
self.pub, self.sub, self.db, self.listener = x

我不确定您要使用return_when参数和对asyncio.wait的调用做什么。 create_connections函数不会启动一组并行任务,而是等待每个调用,然后再继续执行下一个任务。也许可以通过并行运行四个调用来提高性能,但这是一个不同的问题。

关于python - 如何从init运行异步协程,等待完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60554080/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com