gpt4 book ai didi

django.channels 异步消费者似乎没有异步执行

转载 作者:行者123 更新时间:2023-12-02 20:30:35 25 4
gpt4 key购买 nike

我已将 django.channels 添加到 django 项目中,以支持长时间运行的进程,通过 websockets 通知用户进度。

一切似乎都工作正常,除了长时间运行的进程的实现似乎没有异步响应之外。

为了测试,我创建了一个 AsyncConsumer 来识别两种类型的消息“run”和“isBusy”。

“运行”消息处理程序设置“繁忙标志”,发回“进程正在运行”消息,异步等待 20 秒,重置“繁忙标志”,然后发回“进程”完整消息'

“isBusy”消息返回一 strip 有忙碌标志状态的消息。

我的期望是,如果我发送运行消息,我将立即收到“进程正在运行”消息,20 秒后我将收到“进程完成”消息。这按预期工作。

我还希望,如果我发送“isBusy”消息,我将立即收到带有标志状态的响应。

观察到的行为如下:

  • (从客户端)发送消息“run”
  • 立即收到一条消息“正在运行,请稍等”
  • (从客户端)发送消息“isBusy”
  • 消息到达服务器端的 Web 套接字监听器
  • 在运行处理程序完成之前不会发生任何事情
  • 客户端收到“运行完成”消息
  • 紧接着是“process isBusy:False”消息

这是 Channel 监听器的实现:

class BackgroundConsoleConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.busy = False

async def run(self, message):
print("run got message", message)
self.busy = True
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text":"running please wait"
})
await asyncio.sleep(20)
self.busy = False
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text": "finished running"
})

async def isBusy(self,message):
print('isBusy got message', message)
await self.channel_layer.group_send('consoleChannel',{
"type":"consoleResponse",
"text": "process isBusy:{0}".format(self.busy)
})

channel 在路由文件中设置如下:

application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter([
url("^console/$", ConsoleConsumer),
])

),
"channel": ChannelNameRouter({
"background-console":BackgroundConsoleConsumer,
}),
})

我使用一名工作人员运行该 channel (通过 ./manage.py runworker )。

实验是使用 django 测试服务器(通过 runserver)完成的。

任何关于为什么 channel 消费者似乎没有异步工作的想法将不胜感激。

最佳答案

经过一番挖掘后,我们发现了问题所在以及解决方案。

channel 将发送给它的消息添加到 asyncio.Queue 中并按顺序处理它们。

仅仅释放协程控制(通过 asyncio.sleep() 或类似的方法)是不够的,必须在消费者接收到新消息之前完成消息处理程序的处理。

这是对上一个示例的修复,其行为符合预期(即在处理 run 长时间运行任务时响应 isBusy 消息)

感谢@user4815162342的建议。

class BackgroundConsoleConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.busy = False

async def run(self, message):
loop = asyncio.get_event_loop()
loop.create_task(self.longRunning())

async def longRunning(self):
self.busy = True
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "running please wait", 'author': 'background console process'})
})
print('before sleeping')
await asyncio.sleep(20)
print('after sleeping')
self.busy = False
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "finished running", 'author': 'background console process'})
})

async def isBusy(self,message):
print('isBusy got message', message)
await self.channel_layer.group_send('consoleChannel',{
"type":"the.type",
"text": json.dumps({'message': "process isBusy:{0}".format(self.busy),
'author': 'background console process'})
})

关于django.channels 异步消费者似乎没有异步执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48871831/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com