- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Python中线程的MQ消息队列实现以及消息队列的优点解析由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由
Python的消息队列示例:
1.threading+Queue实现线程队列 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/env python
import
Queue
import
threading
import
time
queue
=
Queue.Queue()
class
ThreadNum(threading.Thread):
"""没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
def
__init__(
self
, queue):
threading.Thread.__init__(
self
)
self
.queue
=
queue
def
run(
self
):
whileTrue:
#消费者端,从队列中获取num
num
=
self
.queue.get()
print
"i'm num %s"
%
(num)
time.sleep(
1
)
#在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
self
.queue.task_done()
start
=
time.time()
def
main():
#产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
for
i
in
range
(
10
):
t
=
ThreadNum(queue)
t.setDaemon(
True
)
t.start()
#往队列中填错数据
for
num
in
range
(
10
):
queue.put(num)
#wait on the queue until everything has been processed
queue.join()
main()
print
"Elapsed Time: %s"
%
(time.time()
-
start)
|
运行结果:
1
2
3
4
5
6
7
8
9
10
11
|
i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896
|
解读: 具体工作步骤描述如下: 1,创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。 2,将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。 3,生成守护线程池。 4,每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。 5,在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。 6,对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。 在使用这个模式时需要注意一点:通过将守护线程设置为 true,程序运行完自动退出。好处是在退出之前,可以对队列执行 join 操作、或者等到队列为空.
2.多个队列 所谓多个队列,一个队列的输出可以作为另一个队列的输入! 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#!/usr/bin/env python
import
Queue
import
threading
import
time
queue
=
Queue.Queue()
out_queue
=
Queue.Queue()
class
ThreadNum(threading.Thread):
"""bkeep"""
def
__init__(
self
, queue, out_queue):
threading.Thread.__init__(
self
)
self
.queue
=
queue
self
.out_queue
=
out_queue
def
run(
self
):
whileTrue:
#从队列中取消息
num
=
self
.queue.get()
bkeep
=
num
#将bkeep放入队列中
self
.out_queue.put(bkeep)
#signals to queue job is done
self
.queue.task_done()
class
PrintLove(threading.Thread):
"""Threaded Url Grab"""
def
__init__(
self
, out_queue):
threading.Thread.__init__(
self
)
self
.out_queue
=
out_queue
def
run(
self
):
whileTrue:
#从队列中获取消息并赋值给bkeep
bkeep
=
self
.out_queue.get()
keke
=
"I love "
+
str
(bkeep)
print
keke,
print
self
.getName()
time.sleep(
1
)
#signals to queue job is done
self
.out_queue.task_done()
start
=
time.time()
def
main():
#populate queue with data
for
num
in
range
(
10
):
queue.put(num)
#spawn a pool of threads, and pass them queue instance
for
i
in
range
(
5
):
t
=
ThreadNum(queue, out_queue)
t.setDaemon(
True
)
t.start()
for
i
in
range
(
5
):
pl
=
PrintLove(out_queue)
pl.setDaemon(
True
)
pl.start()
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
main()
print
"Elapsed Time: %s"
%
(time.time()
-
start)
|
运行结果:
1
2
3
4
5
6
7
8
9
10
11
|
I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098
|
解读: ThreadNum 类工作流程 定义队列--->继承threading---->初始化queue---->定义run函数--->get queue中的数据---->处理数据---->put数据到另外一个queue-->发信号告诉queue该条处理完毕 main函数工作流程: --->往自定义queue中扔数据 --->for循环确定启动的线程数---->实例化ThreadNum类---->启动线程并设置守护 --->for循环确定启动的线程数---->实例化PrintLove类--->启动线程并设置为守护 --->等待queue中的消息处理完毕后执行join。即退出主程序.
。
了解了MQ的大概实现以后,我们来总结一下消息队列的优点: 1. 解耦 。
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束.
2. 冗余 。
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕.
3. 扩展性 。
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单.
4. 灵活性 & 峰值处理能力 。
当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。 请查看我们关于峰值处理能力的博客文章了解更多此方面的信息.
5. 可恢复性 。
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别.
6. 送达保证 。
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理.
7.排序保证 。
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置.
8.缓冲 。
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度.
9. 理解数据流 。
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化.
10. 异步通信 。
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们.
最后此篇关于Python中线程的MQ消息队列实现以及消息队列的优点解析的文章就讲到这里了,如果你想了解更多关于Python中线程的MQ消息队列实现以及消息队列的优点解析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我们目前使用 MQ Explorer 来管理 Z/OS 上的 WebSphere MQ V7。前几天误删了一个队列。后来我想回顾一下历史并查看一些日志以了解它究竟发生在何时。我的问题是,MQ Expl
默认情况下是否安装了扩展事务客户端?如何验证是否已安装?我如何安装这个? 最佳答案 在先前版本的 MQ 客户端中,它当然不包含在标准客户端中(事务客户端具有成本/许可影响)。 但是,从 WMQ v7.
我有 2 个队列,比如 Q1 和 Q2。当我使用 MQPUT 将消息插入 Q1 时,有什么方法可以将此消息复制到 Q2 中吗? WMQ 是否支持队列复制? 注意:队列驻留在不同的队列管理器上。 谢谢,
我有一个进程,它使用 JMSTemplate 根据 JMS header 值有选择地从 MQ 队列中出列。 当出队查询匹配队列前面的消息时,出队速率约为 60-70 条消息/秒。但是,当查询仅匹配 5
Source LogPrimaryFiles=3|2-254 (Windows)|2-510 (UNIX systems) The log files allocated when the queue
在Websphere MQ系列中,队列管理器的命令级别是701。它实际上指定了什么? 最佳答案 WebSphere 产品使用“[版本].[发行版].[修改].[修订包]”命名约定。例如,7.0.1.6
在哪里可以找到 IBM MQ 版本 V8.0.0.5 和 V9.0 之间的区别?我试图在 IBM 网站上查找它,但没有成功。 最佳答案 IBM 的 v9 知识中心页面“What's new in Ve
我已经在我的机器上安装了 MQ(已经用 regedit32 检查过)但是当我在命令提示符下键入“runmqsc”时出现错误“无法识别命令”(为 mqjms.jar 设置了环境变量)我是什么失踪 ?我想
我在我的系统中安装了 MQ V8.0.0.2,我正在应用修复包以使用静默安装方法将其升级到 8.0.0.5。它运行成功并完成,但 dspmqver仍然说版本为 8.0.0.2。 它在 64 位的 Wi
我们有一个场景,我们希望 node.js 应用程序使用来自后端系统的消息,该后端系统当前将消息放入 Websphere MQ 队列(通过 SAP PI)。 在 MQ 8.0.0.3 中,有一个 AMQ
我们有消息通过 WebSphere MQ 队列传入。我们需要很长时间才能收到消息。 是否有一种简单易行的方法来跟踪收到/提取消息的时间? 最佳答案 发送消息后,您可以请求确认交货。当消息被消费时,一条
我想将文件系统中的文件加载到 WebSphere MQ 队列。有几个支持 pacs - Q Program和 MO03: WebSphere MQ Queue Load / Unload Utilit
有人使用过 RPG 中的 MQ 吗?问题如下。队列中有几条消息。它们都带有 RFH2 header 。每个 header 都包含一组 NameValueData。我正在创建消息句柄并将其传递给 MQG
我有一组 IBM MQ 队列管理器,想知道其中一个何时重新启动或何时自动故障转移到备用实例。 队列管理器位于 AIX 上 问候, 最佳答案 您可以从 AMQERR01.LOG 中找到此信息。队列管理器
作为我们应用程序安装的一部分,我需要将一堆 xml 消息放入一个 MQ 队列中。为了使它更复杂,消息需要设置 RFH2 header 的 usr 文件夹。 我发现 mqput2.exe来自 IBM R
我一直在研究变幻莫测的 channel 状态、它们如何进入这些状态以及如何停止或启动它们。我现在已经有了相当扎实的理解,但是一位同事提出了 channel 重置的话题。 当我无法解释发生了什么时,我偶
我从 MQ 安全演示文稿中看到一项建议,如果您不需要命令服务器,它会关闭它。我的问题是如何确定我是否真的需要它。 从我的角度来看,如果没有运行目标 QMGR 的管理程序,例如 MQ Explorer
是否可以保留已检索且不再位于队列中的消息历史记录(包含消息内容将是完美的)? 在应用程序中,我可以看到发送者何时尝试将消息放入队列以及接收者何时尝试拾取消息,但我想查看消息何时真正到达队列以及消息何时
有没有办法找到在特定时间段内通过 IBM websphere MQ 队列管理器的消息总数? 最佳答案 这听起来像是 MQ 记帐和统计功能的完美使用。除此之外,这些功能还记录消息数量(具有持久和非持久计
我正在向远程队列发送消息,但我无法控制该队列。 我发送一个 xml 文件作为消息,但是当应用程序读取消息时,它会得到一个消息头,例如 jms_text \0\0\0lqueue:///TEST128
我是一名优秀的程序员,十分优秀!