gpt4 book ai didi

python - queue.Queue 上的多路复用?

转载 作者:IT老高 更新时间:2023-10-28 13:07:29 28 4
gpt4 key购买 nike

如何在多个 queue.Queue 上进行“选择”同时?

Golang 有 desired feature及其 channel :

select {
case i1 = <-c1:
print("received ", i1, " from c1\n")
case c2 <- i2:
print("sent ", i2, " to c2\n")
case i3, ok := (<-c3): // same as: i3, ok := <-c3
if ok {
print("received ", i3, " from c3\n")
} else {
print("c3 is closed\n")
}
default:
print("no communication\n")
}

其中第一个解除阻塞的 channel 执行相应的阻塞。我如何在 Python 中实现这一点?

更新0

根据 the linktux21b's answer 中给出,所需的队列类型具有以下属性:

  • 多生产者/多消费者队列 (MPMC)
  • 提供每个生产者的 FIFO/LIFO
  • 当队列为空/满时,消费者/生产者会被阻塞

此外, channel 可能会被阻塞,生产者会一直阻塞,直到消费者检索到该项目。我不确定 Python 的 Queue 可以做到这一点。

最佳答案

如果您使用 queue.PriorityQueue,您可以使用 channel 对象作为优先级获得类似的行为:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")

class ChannelManager(object):
next_priority = 0

def __init__(self):
self.queue = PriorityQueue()
self.channels = []

def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)

def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)

@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item


def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel


class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager

def __str__(self):
return self.name

def __lt__(self, other):
return self.priority < other.priority

def put(self, item):
self.manager.put(self, item)


if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3

manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]

def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)

producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')

def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')

consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')

示例输出:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

在本例中,ChannelManager 只是 queue.PriorityQueue 的一个包装器,它将 select 方法实现为 contextmanager 使其看起来类似于 Go 中的 select 语句。

需要注意的几点:

  • 排序

    • 在 Go 示例中, channel 在 select 语句中的写入顺序决定了如果有多个 channel 可用的数据,将执行哪个 channel 的代码。

    • 在 python 示例中,顺序由分配给每个 channel 的优先级决定。但是,优先级可以动态分配给每个 channel (如示例中所示),因此可以使用更复杂的 select 方法更改顺序,该方法负责根据参数分配新的优先级到方法。此外,一旦上下文管理器完成,旧的排序可能会重新建立。

  • 屏蔽

    • 在 Go 示例中,如果存在 default 情况,则 select 语句将阻塞。

    • 在 python 示例中,必须将一个 bool 参数传递给 select 方法,以明确何时需要阻塞/非阻塞。在非阻塞情况下,上下文管理器返回的 channel 只是字符串 'default' 因此在里面的代码中很容易在 with 里面的代码中检测到这一点> 声明。

  • 线程:queue 模块中的对象已经为多生产者、多消费者场景做好了准备,如示例中所示。

关于python - queue.Queue 上的多路复用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8456516/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com