- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Python中线程的MQ消息队列实现以及消息队列的优点解析由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由
Python的消息队列示例:
1.threading+Queue实现线程队列 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#!/usr/bin/env python
import
Queue
import
threading
import
time
queue
=
Queue.Queue()
class
ThreadNum(threading.Thread):
"""没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
def
__init__(
self
, queue):
threading.Thread.__init__(
self
)
self
.queue
=
queue
def
run(
self
):
whileTrue:
#消费者端,从队列中获取num
num
=
self
.queue.get()
print
"i'm num %s"
%
(num)
time.sleep(
1
)
#在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
self
.queue.task_done()
start
=
time.time()
def
main():
#产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
for
i
in
range
(
10
):
t
=
ThreadNum(queue)
t.setDaemon(
True
)
t.start()
#往队列中填错数据
for
num
in
range
(
10
):
queue.put(num)
#wait on the queue until everything has been processed
queue.join()
main()
print
"Elapsed Time: %s"
%
(time.time()
-
start)
|
运行结果:
1
2
3
4
5
6
7
8
9
10
11
|
i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896
|
解读: 具体工作步骤描述如下: 1,创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。 2,将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。 3,生成守护线程池。 4,每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。 5,在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。 6,对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。 在使用这个模式时需要注意一点:通过将守护线程设置为 true,程序运行完自动退出。好处是在退出之前,可以对队列执行 join 操作、或者等到队列为空.
2.多个队列 所谓多个队列,一个队列的输出可以作为另一个队列的输入! 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#!/usr/bin/env python
import
Queue
import
threading
import
time
queue
=
Queue.Queue()
out_queue
=
Queue.Queue()
class
ThreadNum(threading.Thread):
"""bkeep"""
def
__init__(
self
, queue, out_queue):
threading.Thread.__init__(
self
)
self
.queue
=
queue
self
.out_queue
=
out_queue
def
run(
self
):
whileTrue:
#从队列中取消息
num
=
self
.queue.get()
bkeep
=
num
#将bkeep放入队列中
self
.out_queue.put(bkeep)
#signals to queue job is done
self
.queue.task_done()
class
PrintLove(threading.Thread):
"""Threaded Url Grab"""
def
__init__(
self
, out_queue):
threading.Thread.__init__(
self
)
self
.out_queue
=
out_queue
def
run(
self
):
whileTrue:
#从队列中获取消息并赋值给bkeep
bkeep
=
self
.out_queue.get()
keke
=
"I love "
+
str
(bkeep)
print
keke,
print
self
.getName()
time.sleep(
1
)
#signals to queue job is done
self
.out_queue.task_done()
start
=
time.time()
def
main():
#populate queue with data
for
num
in
range
(
10
):
queue.put(num)
#spawn a pool of threads, and pass them queue instance
for
i
in
range
(
5
):
t
=
ThreadNum(queue, out_queue)
t.setDaemon(
True
)
t.start()
for
i
in
range
(
5
):
pl
=
PrintLove(out_queue)
pl.setDaemon(
True
)
pl.start()
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
main()
print
"Elapsed Time: %s"
%
(time.time()
-
start)
|
运行结果:
1
2
3
4
5
6
7
8
9
10
11
|
I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098
|
解读: ThreadNum 类工作流程 定义队列--->继承threading---->初始化queue---->定义run函数--->get queue中的数据---->处理数据---->put数据到另外一个queue-->发信号告诉queue该条处理完毕 main函数工作流程: --->往自定义queue中扔数据 --->for循环确定启动的线程数---->实例化ThreadNum类---->启动线程并设置守护 --->for循环确定启动的线程数---->实例化PrintLove类--->启动线程并设置为守护 --->等待queue中的消息处理完毕后执行join。即退出主程序.
。
了解了MQ的大概实现以后,我们来总结一下消息队列的优点: 1. 解耦 。
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束.
2. 冗余 。
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕.
3. 扩展性 。
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单.
4. 灵活性 & 峰值处理能力 。
当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。 请查看我们关于峰值处理能力的博客文章了解更多此方面的信息.
5. 可恢复性 。
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别.
6. 送达保证 。
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理.
7.排序保证 。
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置.
8.缓冲 。
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度.
9. 理解数据流 。
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化.
10. 异步通信 。
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们.
最后此篇关于Python中线程的MQ消息队列实现以及消息队列的优点解析的文章就讲到这里了,如果你想了解更多关于Python中线程的MQ消息队列实现以及消息队列的优点解析的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
背景: 我最近一直在使用 JPA,我为相当大的关系数据库项目生成持久层的轻松程度给我留下了深刻的印象。 我们公司使用大量非 SQL 数据库,特别是面向列的数据库。我对可能对这些数据库使用 JPA 有一
我已经在我的 maven pom 中添加了这些构建配置,因为我希望将 Apache Solr 依赖项与 Jar 捆绑在一起。否则我得到了 SolarServerException: ClassNotF
interface ITurtle { void Fight(); void EatPizza(); } interface ILeonardo : ITurtle {
我希望可用于 Java 的对象/关系映射 (ORM) 工具之一能够满足这些要求: 使用 JPA 或 native SQL 查询获取大量行并将其作为实体对象返回。 允许在行(实体)中进行迭代,并在对当前
好像没有,因为我有实现From for 的代码, 我可以转换 A到 B与 .into() , 但同样的事情不适用于 Vec .into()一个Vec . 要么我搞砸了阻止实现派生的事情,要么这不应该发
在 C# 中,如果 A 实现 IX 并且 B 继承自 A ,是否必然遵循 B 实现 IX?如果是,是因为 LSP 吗?之间有什么区别吗: 1. Interface IX; Class A : IX;
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我正在阅读标准haskell库的(^)的实现代码: (^) :: (Num a, Integral b) => a -> b -> a x0 ^ y0 | y0 a -> b ->a expo x0
我将把国际象棋游戏表示为 C++ 结构。我认为,最好的选择是树结构(因为在每个深度我们都有几个可能的移动)。 这是一个好的方法吗? struct TreeElement{ SomeMoveType
我正在为用户名数据库实现字符串匹配算法。我的方法采用现有的用户名数据库和用户想要的新用户名,然后检查用户名是否已被占用。如果采用该方法,则该方法应该返回带有数据库中未采用的数字的用户名。 例子: “贾
我正在尝试实现 Breadth-first search algorithm , 为了找到两个顶点之间的最短距离。我开发了一个 Queue 对象来保存和检索对象,并且我有一个二维数组来保存两个给定顶点
我目前正在 ika 中开发我的 Python 游戏,它使用 python 2.5 我决定为 AI 使用 A* 寻路。然而,我发现它对我的需要来说太慢了(3-4 个敌人可能会落后于游戏,但我想供应 4-
我正在寻找 Kademlia 的开源实现C/C++ 中的分布式哈希表。它必须是轻量级和跨平台的(win/linux/mac)。 它必须能够将信息发布到 DHT 并检索它。 最佳答案 OpenDHT是
我在一本书中读到这一行:-“当我们要求 C++ 实现运行程序时,它会通过调用此函数来实现。” 而且我想知道“C++ 实现”是什么意思或具体是什么。帮忙!? 最佳答案 “C++ 实现”是指编译器加上链接
我正在尝试使用分支定界的 C++ 实现这个背包问题。此网站上有一个 Java 版本:Implementing branch and bound for knapsack 我试图让我的 C++ 版本打印
在很多情况下,我需要在 C# 中访问合适的哈希算法,从重写 GetHashCode 到对数据执行快速比较/查找。 我发现 FNV 哈希是一种非常简单/好/快速的哈希算法。但是,我从未见过 C# 实现的
目录 LRU缓存替换策略 核心思想 不适用场景 算法基本实现 算法优化
1. 绪论 在前面文章中提到 空间直角坐标系相互转换 ,测绘坐标转换时,一般涉及到的情况是:两个直角坐标系的小角度转换。这个就是我们经常在测绘数据处理中,WGS-84坐标系、54北京坐标系
在软件开发过程中,有时候我们需要定时地检查数据库中的数据,并在发现新增数据时触发一个动作。为了实现这个需求,我们在 .Net 7 下进行一次简单的演示. PeriodicTimer .
二分查找 二分查找算法,说白了就是在有序的数组里面给予一个存在数组里面的值key,然后将其先和数组中间的比较,如果key大于中间值,进行下一次mid后面的比较,直到找到相等的,就可以得到它的位置。
我是一名优秀的程序员,十分优秀!