gpt4 book ai didi

python - rabbitmq 中的多个消费者用于多个队列

转载 作者:行者123 更新时间:2023-12-04 16:16:12 24 4
gpt4 key购买 nike

我有 2 个队列,比如 q1 和 q2,它们对应于 e1 和 e2 交换,绑定(bind) key b1 和 b2。我想并行运行消费者函数,比如 c1 和 c2,它们将分别监听 q1 和 q2。我尝试了以下方式:

def c1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
channel = connection.channel()
channel.exchange_declare(exchange='e1', durable='true',
type='topic')
result = channel.queue_declare(durable='false', queue='q1')
queue_name = result.method.queue
binding_key = "b1"
channel.queue_bind(exchange='e1',
queue=queue_name,
routing_key=binding_key)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
channel.start_consuming()

def c2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp))
channel = connection.channel()
channel.exchange_declare(exchange='e2', durable='true',
type='topic')
result = channel.queue_declare(durable='false', queue='q2')
queue_name = result.method.queue
binding_key = "b2"
channel.queue_bind(exchange=e1,
queue=queue_name,
routing_key=binding_key)
channel.basic_consume(callback,queue=queue_name,no_ack=False)
channel.start_consuming()

if __name__ == '__main__':
c1()
c2()

但是,它只是监听c1函数和c2函数,并没有被执行。如何运行这两个功能?提前致谢。

编辑:我在 2 个不同的模块(文件)中有方法 c1 和 c1

最佳答案

为了同时运行这两个函数,一些多线程方法需要有序。请看here对于一些 python 示例。

这是使用 Process 类修改的代码。它还可以使用线程或从操作系统显式运行它。

import pika
from multiprocessing import Process


def callback():
print 'callback got data'


class c1():
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='e1', durable='true', type='topic')
result = self.channel.queue_declare(durable='false', queue='q1')
queue_name = result.method.queue
binding_key = "b1"
self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)
self.channel.basic_consume(callback,queue=queue_name,no_ack=False)

def run(self):
self.channel.start_consuming()


class c2():
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='e2', durable='true', type='topic')
result = self.channel.queue_declare(durable='false', queue='q2')
queue_name = result.method.queue
binding_key = "b2"
self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key)

self.channel.basic_consume(callback,queue=queue_name,no_ack=False)

def run(self):
self.channel.start_consuming()

if __name__ == '__main__':
subscriber_list = []
subscriber_list.append(c1())
subscriber_list.append(c2())

# execute
process_list = []
for sub in subscriber_list:
process = Process(target=sub.run)
process.start()
process_list.append(process)

# wait for all process to finish
for process in process_list:
process.join()

关于python - rabbitmq 中的多个消费者用于多个队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45132921/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com