gpt4 book ai didi

python - 异步队列消费者协程

转载 作者:太空狗 更新时间:2023-10-29 17:07:38 25 4
gpt4 key购买 nike

我有一个 asyncio.Protocol 子类从服务器接收数据。我将此数据(每一行,因为数据是文本)存储在 asyncio.Queue 中。

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None

def connection_made(self, transport):
self.transport = transport

def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())

def connection_lost(self, exc):
self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

我想有另一个协程负责消费队列中的数据并进行处理。

  • 这应该是一个asyncio.Task吗?
  • 如果队列因为几秒钟没有收到数据而变空怎么办?如何确保我的消费者不会停止 (run_until_complete)?
  • 有没有比为我的队列使用全局变量更简洁的方法?

最佳答案

Should this be a asyncio.Task?

是的,使用 asyncio.ensure_future 创建它或 loop.create_task .

What if the queue becomes empty because for a few seconds no data is received?

只需使用 queue.get等待一个项目可用:

async def consume(queue):
while True:
item = await queue.get()
print(item)

Is there a cleaner way than using a global variable for my queue?

是的,只需将它作为参数传递给消费者协程和流协议(protocol):

class StreamProtocol(asyncio.Protocol):
def __init__(self, loop, queue):
self.loop = loop
self.queue = queue

def data_received(self, data):
for message in data.decode().splitlines():
self.queue.put_nowait(message.rstrip())

def connection_lost(self, exc):
self.loop.stop()

How can I make sure my consumer doesn't stop (run_until_complete)?

连接关闭后,使用queue.join等待队列为空。


完整示例:

loop = asyncio.get_event_loop()
queue = asyncio.Queue()
# Connection coroutine
factory = lambda: StreamProtocol(loop, queue)
connection = loop.create_connection(factory, '127.0.0.1', '42')
# Consumer task
consumer = asyncio.ensure_future(consume(queue))
# Set up connection
loop.run_until_complete(connection)
# Wait until the connection is closed
loop.run_forever()
# Wait until the queue is empty
loop.run_until_complete(queue.join())
# Cancel the consumer
consumer.cancel()
# Let the consumer terminate
loop.run_until_complete(consumer)
# Close the loop
loop.close()

或者,您也可以使用 streams :

async def tcp_client(host, port, loop=None):
reader, writer = await asyncio.open_connection(host, port, loop=loop)
async for line in reader:
print(line.rstrip())
writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop))
loop.close()

关于python - 异步队列消费者协程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35127520/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com