gpt4 book ai didi

Python操作RabbitMQ服务器实现消息队列的路由功能

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 30 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Python操作RabbitMQ服务器实现消息队列的路由功能由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server),这里我们来看一下MQ相关的路由功能.

路由键的实现 。

比如有一个需要给所有接收端发送消息的场景,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了.

路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键。发送端通过交换机发送信息时,可以指明路由键 ,交换机会根据路由键把消息发送到相应的消息队列,这样接收端就能接收到消息了.

这边继上一篇,还是用send.py和receive.py来模拟实现路由键的功能。send.py表示发送端,receive.py表示接收端。实例的功能就是将info、warning、error三种级别的信息发送到不同的接收端.

send.py代码分析 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost' ))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange = 'messages' , type = 'direct' )
 
#定义三个路由键
routings = [ 'info' , 'warning' , 'error' ]
 
#将消息依次发送到交换机,并设置路由键
for routing in routings:
   message = '%s message.' % routing
   channel.basic_publish(exchange = 'messages' ,
              routing_key = routing,
              body = message)
   print message
 
connection.close()

receive.py代码分析 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost' ))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange = 'messages' , type = 'direct' )
 
#从命令行获取路由键参数,如果没有,则设置为info
routings = sys.argv[ 1 :]
if not routings:
   routings = [ 'info' ]
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
for routing in routings:
   channel.queue_bind(exchange = 'messages' ,
             queue = queue_name,
             routing_key = routing)
 
def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue = queue_name, no_ack = True )
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

打开两个终端,一个运行代码python receive.py info warning,表示只接收info和warning的消息。另外一个终端运行send.py,可以观察到接收终端只接收到了info和warning的消息。如果打开多个终端运行receive.py,并传入不同的路由键参数,可以看到更明显的效果.

当接收端正在运行时,可以使用rabbitmqctl list_bindings来查看绑定情况.

路由键模糊匹配 路由键模糊匹配,就是可以使用正则表达式,和常用的正则表示式不同,这里的话“#”表示所有、全部的意思;“*”只匹配到一个词。看完示例就能明白了.

这边继上面的例子,还是用send.py和receive.py来实现路由键模糊匹配的功能。send.py表示发送端,receive.py表示接收端。实例的功能大概是这样:比如你有个知心好朋友,不管开心、伤心、工作上的还是生活上的事情都可以和她说;还有一些朋友可以分享开心的事情;还有一些朋友,你可以把不开心的事情和她说.

send.py代码分析 。

因为要进行路由键模糊匹配,所以交换机的类型要设置为topic,设置为topic,就可以使用#,*的匹配符号了.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost' ))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange = 'messages' , type = 'topic' )
 
#定义路由键
routings = [ 'happy.work' , 'happy.life' , 'sad.work' , 'sad.life' ]
 
#将消息依次发送到交换机,并设定路由键
for routing in routings:
   message = '%s message.' % routing
   channel.basic_publish(exchange = 'messages' ,
              routing_key = routing,
              body = message)
   print message
 
connection.close()

上例中定义了四种类型的消息,容易理解,就不解释了,然后依次发送出去.

receive.py代码分析 。

同样,交换机的类型要设定为topic就可以了。从命令行接收参数的功能稍微调整了一下,就是没有参数时报错退出.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost' ))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange = 'messages' , type = 'topic' )
 
#从命令行获取路由参数,如果没有,则报错退出
routings = sys.argv[ 1 :]
if not routings:
   print >> sys.stderr, "Usage: %s [routing_key]..." % (sys.argv[ 0 ],)
   exit()
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
for routing in routings:
   channel.queue_bind(exchange = 'messages' ,
             queue = queue_name,
             routing_key = routing)
 
def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue = queue_name, no_ack = True )
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

打开四个终端,一个运行如下,表示任何事情都可以和她说:

?
1
python receive.py "#"

另外一个终端 运行如下,表示可以和她分享开心的事:

?
1
python receive.py "happy.*"

第三个运行如下,表示工作上的事情可以和她分享:

?
1
python receive.py "*.work"

最后一个运行python send.py。结果不难想象出来,就不贴出来了.

最后此篇关于Python操作RabbitMQ服务器实现消息队列的路由功能的文章就讲到这里了,如果你想了解更多关于Python操作RabbitMQ服务器实现消息队列的路由功能的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com