gpt4 book ai didi

python - 异步和 rabbitmq (asynqp) : how to consume from multiple queues concurrently

转载 作者:太空宇宙 更新时间:2023-11-04 00:47:18 24 4
gpt4 key购买 nike

我正在尝试使用 python、asyncio 和 asynqp 同时使用多个队列.

我不明白为什么我的 asyncio.sleep() 函数调用没有任何效果。代码不会停在那里。公平地说,我实际上不明白回调是在哪个上下文中执行的,以及我是否可以完全将控制权交给事件循环(这样 asyncio.sleep() 调用就会感)。

如果我必须在我的 process_msg 回调函数中使用 aiohttp.ClientSession.get() 函数调用怎么办?我做不到,因为它不是协程。必须有一种方法超出我目前对 asyncio 的理解。

#!/usr/bin/env python3

import asyncio
import asynqp


USERS = {'betty', 'bob', 'luis', 'tony'}


def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()

async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')

# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)

# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)


loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()

最佳答案

I don't understand why my asyncio.sleep() function call does not have any effect.

因为 asyncio.sleep() 返回一个必须与事件循环(或 async/await 语义)结合使用的 future 对象。

你不能在简单的 def 声明中使用 await 因为回调是在附加到某些的 async/await 上下文之外调用的引擎盖下的事件循环。换句话说,将回调样式与 async/await 样式混合是非常棘手的。

虽然简单的解决方案是将工作安排回事件循环:

async def process_msg(msg):
await asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()

def _process_msg(msg):
loop = asyncio.get_event_loop()
loop.create_task(process_msg(msg))
# or if loop is always the same one single line is enough
# asyncio.ensure_future(process_msg(msg))

# some code
await queue.consume(_process_msg)

请注意,_process_msg 函数中没有递归,即 _process_msg 的主体不会在 _process_msg 中执行。一旦控件返回到事件循环,将调用内部 process_msg 函数。

这可以用下面的代码概括:

def async_to_callback(coro):
def callback(*args, **kwargs):
asyncio.ensure_future(coro(*args, **kwargs))
return callback

async def process_msg(msg):
# the body

# some code
await queue.consume(async_to_callback(process_msg))

关于python - 异步和 rabbitmq (asynqp) : how to consume from multiple queues concurrently,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38895872/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com