gpt4 book ai didi

python - 在常规回调中调用异步代码的正确方法是什么?

转载 作者:行者123 更新时间:2023-11-30 22:48:16 24 4
gpt4 key购买 nike

在常规回调中调用异步代码的正确方法是什么?这段代码可以工作,但看起来不太漂亮。我不喜欢响应的调用方式:它需要通过所有函数传递调用者地址。如何为处理程序设置超时?

我在评论中提出了问题。

import asyncio
import logging
logger = logging.getLogger('protocol')

async def echo(data):
#external lib emulator
data = b'>'+data
await asyncio.sleep(1)
return data


class Protocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
logger.debug('connection_made called')
self.transport = transport

def respond(self,task):
logger.debug('respond called')
# i want to get data in attrs, not task

resp,caller = task.result()

self.transport.sendto(resp, caller)


async def handler(self,data, caller):
logger.debug('handler called')

# async needed for `await` and `async for` external library such motor, etc
# do some awaits
data = await echo(data)

# simple echo
return (data, caller)


def datagram_received(self, data, addr):
logger.debug('datagram_received called')

# handler selected by data header
handler = self.handler

# how to do run async handler?
loop = asyncio.get_event_loop()
c = handler(data, addr) #coroutine

f = asyncio.ensure_future(c,loop=loop) #attach coroutine to loop
f.add_done_callback(self.respond)

# How to get response here?

# i cant loop.run_until_complete(...) because loop.run_forever() running

#def wakeup():
# pass
#loop.call_soon(wakeup)
# without this call_soon future is not executed in first programm code, but works in test and after refactor


def main(HOST,PORT):

loop = asyncio.get_event_loop()
t = asyncio.Task(loop.create_datagram_endpoint(
Protocol, local_addr=(HOST,PORT)))
transport, server = loop.run_until_complete(t)

sock = transport.get_extra_info('socket')
# socket tuning here

try:
loop.run_forever()
finally:
transport.close()
loop.close()

logging.basicConfig(level=logging.DEBUG)
main('0.0.0.0',10012)

使用netcat测试nc -u 127.0.0.1 10012

最佳答案

您可以使用 asyncio.Queue 将任务放入一个协程中(处理)并在另一个协程中等待它们(响应):

class Protocol(asyncio.DatagramProtocol):

def __init__(self):
self.data_queue = asyncio.Queue(loop=asyncio.get_event_loop())
asyncio.ensure_future(self.respond())

def connection_made(self, transport):
self.transport = transport

def datagram_received(self, data, addr):
asyncio.ensure_future(self.handler(data, addr), loop=asyncio.get_event_loop())

async def respond(self):
while True:
resp, caller = await self.data_queue.get()
self.transport.sendto(resp, caller)

async def handler(self, data, caller):
data = await echo(data)
self.data_queue.put((data, caller))

关于python - 在常规回调中调用异步代码的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40241039/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com