gpt4 book ai didi

python - 在 python asyncio 中使用协程创建多个生产者和消费者的规范方法是什么?

转载 作者:行者123 更新时间:2023-12-05 05:31:10 24 4
gpt4 key购买 nike

我正在尝试理解 Python asyncio。我有一些我认为可行的东西,但我不认为它是协程编程的规范方法。

我基本上想创建如下所示的任意处理图:

consumer producer coroutines

如何创建一个可以从多个协程接收数据的协程?是asend吗?

如何创建每个生产者发送给每个消费者的网格?

下面的代码创建了下图: each producer has multiple consumers

class Producer:

async def producer(self):
yield "started producer"
for i in range(0, 100):
for connection in self.connections:

await connection.asend(i)


class Consumer:

async def consumer(self):
yield "started consumer"
while True:
for connection in self.producers:
item = yield 0
print("item", item)



async def do():
producers = []
consumers = []
for i in range(0, 10):
producers.append(Producer())
for i in range(0, 10):
consumers.append(Consumer())

prodawait = []
consumeawait = []
for b in consumers:
cons = b.consumer()
await cons.__anext__()
consumeawait.append(cons)
for a in producers:
prod = a.producer()
prodawait.append(prod)
await prod.__anext__()


for producer in producers:
producer.connections = consumeawait
for consumer in consumers:
consumer.producers = prodawait


while True:
for consumer in consumeawait:

for item in prodawait:
await item.__anext__()
await consumer.__anext__()

import asyncio
asyncio.run(do())

最佳答案

我觉得你可以看看asyncio.Queue如何在生产者-消费者之间分配工作量。

这个例子将创建 5 个生产者和 3 个消费者,通过队列连接:

import random
import asyncio


async def producer(n, q):
for i in range(3):
await asyncio.sleep(random.randint(1, 5))
q.put_nowait(f"Producer {n} put item {i}")


async def consumer(n, q):
while True:
data = await q.get()
await asyncio.sleep(random.randint(1, 5))
print(f"Worker {n}: Processed {data=} from queue.")
q.task_done()


async def main():
q = asyncio.Queue()

producers = [asyncio.create_task(producer(n, q)) for n in range(5)]
consumers = [asyncio.create_task(consumer(n, q)) for n in range(3)]

# wait until all producers are finished
await asyncio.gather(*producers)

# wait until queue is empty
await q.join()

# cancel consumer tasks
for task in consumers:
task.cancel()

await asyncio.gather(*consumers, return_exceptions=True)

print("Done!")

打印(例如):

Worker 0: Processed data='Producer 1 put item 0' from queue.
Worker 1: Processed data='Producer 2 put item 0' from queue.
Worker 1: Processed data='Producer 3 put item 0' from queue.
Worker 2: Processed data='Producer 4 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 1' from queue.
Worker 2: Processed data='Producer 3 put item 1' from queue.
Worker 1: Processed data='Producer 1 put item 1' from queue.
Worker 0: Processed data='Producer 4 put item 1' from queue.
Worker 2: Processed data='Producer 1 put item 2' from queue.
Worker 1: Processed data='Producer 2 put item 1' from queue.
Worker 0: Processed data='Producer 0 put item 2' from queue.
Worker 2: Processed data='Producer 3 put item 2' from queue.
Worker 0: Processed data='Producer 2 put item 2' from queue.
Worker 1: Processed data='Producer 4 put item 2' from queue.
Done!

关于python - 在 python asyncio 中使用协程创建多个生产者和消费者的规范方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74420108/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com