- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
如何在多个 queue.Queue
上进行“选择”同时?
Golang 有 desired feature及其 channel :
select {
case i1 = <-c1:
print("received ", i1, " from c1\n")
case c2 <- i2:
print("sent ", i2, " to c2\n")
case i3, ok := (<-c3): // same as: i3, ok := <-c3
if ok {
print("received ", i3, " from c3\n")
} else {
print("c3 is closed\n")
}
default:
print("no communication\n")
}
其中第一个解除阻塞的 channel 执行相应的阻塞。我如何在 Python 中实现这一点?
根据 the link在 tux21b's answer 中给出,所需的队列类型具有以下属性:
此外, channel 可能会被阻塞,生产者会一直阻塞,直到消费者检索到该项目。我不确定 Python 的 Queue 可以做到这一点。
最佳答案
如果您使用 queue.PriorityQueue
,您可以使用 channel 对象作为优先级获得类似的行为:
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager
logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")
class ChannelManager(object):
next_priority = 0
def __init__(self):
self.queue = PriorityQueue()
self.channels = []
def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)
def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)
@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item
def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel
class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager
def __str__(self):
return self.name
def __lt__(self, other):
return self.priority < other.priority
def put(self, item):
self.manager.put(self, item)
if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3
manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]
def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)
producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')
def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')
consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')
示例输出:
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished
在本例中,ChannelManager
只是 queue.PriorityQueue
的一个包装器,它将 select
方法实现为 contextmanager
使其看起来类似于 Go 中的 select
语句。
需要注意的几点:
排序
在 Go 示例中, channel 在 select
语句中的写入顺序决定了如果有多个 channel 可用的数据,将执行哪个 channel 的代码。
在 python 示例中,顺序由分配给每个 channel 的优先级决定。但是,优先级可以动态分配给每个 channel (如示例中所示),因此可以使用更复杂的 select
方法更改顺序,该方法负责根据参数分配新的优先级到方法。此外,一旦上下文管理器完成,旧的排序可能会重新建立。
屏蔽
在 Go 示例中,如果存在 default
情况,则 select
语句将阻塞。
在 python 示例中,必须将一个 bool 参数传递给 select
方法,以明确何时需要阻塞/非阻塞。在非阻塞情况下,上下文管理器返回的 channel 只是字符串 'default'
因此在里面的代码中很容易在 with
里面的代码中检测到这一点> 声明。
线程:queue
模块中的对象已经为多生产者、多消费者场景做好了准备,如示例中所示。
关于python - queue.Queue 上的多路复用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8456516/
本文书接上回《反DDD模式之关系型数据库》,关注公众号(老肖想当外语大佬)获取信息: 最新文章更新; DDD框架源码(.NET、Java双平台); 加群畅聊,建模分析、技术实
我里面有 VC 和一个 collectionView。所有管理 Collection View 的代码我都放在那个 VC 的扩展中。但现在我需要在另一个不同的 VC 中使用这个 Collection
我很难重新使用子机图。 我需要重新使用我已链接到一个状态的状态机作为子机,在另一个状态中作为子机。但是当我给出对它的引用时,我得到一个空指针异常。 引用图片 我已经添加了对 GeneralTopup
我想尝试 lambda 的一些功能,并想编写一个 ArrayList 过滤器,并使用 IntStream 的方法来计算 ArrayList 中数字的平均值和最大值 我的第一个想法是过滤 ArrayLi
我正在开发一个 NodeJS 应用程序并使用 Mocha 进行单元测试。 假设我有两个非常相似的测试服。事实上,这些是针对两个类的测试实现相同的接口(interface)。 例如: suit_a.js
我正在使用 Glade 编写带有对话框的 python GUI。 如果我不使用 Glade,我会使用一个类来创建一个对话窗口 (dialag),运行它 (dialog.run),执行它所做的任何事情,
我在使用自定义单元格创建 UICollectionView 以显示项目时遇到问题。但是在 UICollectionView 刷新后,可重用的单元格填充了错误的索引 刷新前的 UICollectionV
我从 Sencha 学习 ExtJS 并有下一个简单的任务: 我的页面上有 2 个 div 在第一个 div 中我渲染 Ext.Button 在按钮上单击我想将其移动到另一个 div 仅此而已 我写了
我想在不同的 Node 模块中重用 RabbitMQ channel 。由于 channel 是异步创建的,我不确定将此 channel 对象“注入(inject)”到其他模块的最佳方法是什么。 如果
所以我的问题是我收到一个 SIGABRT 错误,其定义如下: *** Terminating app due to uncaught exception 'NSInvalidArgumentExcep
我正在编写一个 PHP 脚本来将主题从旧论坛站点迁移到新站点。 旧论坛站点使用数据库“old_forums” 新论坛站点使用数据库“new_forums” MySQL 用户“forums”拥有两个数据
我有一个使用 jcodec 生成的 MP4 文件。 然后我就有了一个使用 Android 的 MediaCodec 生成的 AAC 文件。 我想将它们混合到一个文件中,并且由于我不想将我的 Andro
我正在使用 ffmpeg 开发一个 c++ 项目。我必须生成一个带有 h264 编码的 mp4 文件。 我的问题是文件生成但是当用 VLC 读取文件时我没有图像,并用 ffprobe 分析它给我(下面
我将尝试重新提出这个问题。 我想要做的是创建一个新的 mp4 文件,其中将包括一个视频文件、两个音频文件和一个字幕文件。我想创建一个可以在我的 iOS 设备和计算机上播放的 mp4 文件。 文件如下:
虽然我的问题可能看起来很抽象,但我希望不是。假设我开发了一个应用程序,一个 ASP.NET MVC 站点,然后我的任务是为这个应用程序构建一个 Winforms 客户端,我可以从现有应用程序中重用多少
我是一名优秀的程序员,十分优秀!