- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用kombu通过生产者/消费者模型来管理RabbitMQ。我启动了我的生产者,该生产者将100个作业放在一个队列中(我只有一个队列和一个交换)。我想同时启动多个消费者,并让每个消费者一次处理一份工作。不幸的是,这些消费者彼此阻塞(即,当一个消费者从队列中抢到一份工作时,其他消费者只是闲着)。如果我杀死了工作中的消费者,那么其他消费者中的一个就会加入并开始工作。有没有办法让所有使用者同时运行,每个使用者处理队列中的不同作业?我的消费者代码如下:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
最佳答案
您需要将使用者预取设置为1(https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos),这样每个使用者将只抓取1条消息,并将其余消息保留在队列中,状态为就绪,因此,如果您有2个使用者,并且QOS设置为1并且您有100消息,您将同时处理2个任务。
我已将缺少的部分添加到您的代码中,以设置预取计数
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
channel = self.rabbitmq_connection.channel()
channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
关于python - Python Kombu-阻止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23947119/
使用 Kombu 和 RabbitMQ 来实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者: from kombu import Connection, Exchange, Queue me
我正在使用kombu通过生产者/消费者模型来管理RabbitMQ。我启动了我的生产者,该生产者将100个作业放在一个队列中(我只有一个队列和一个交换)。我想同时启动多个消费者,并让每个消费者一次处理一
我正在测试昆布的工作原理。我计划在几个项目中替换 pika。我看到 kombu 有很多文档,但是使用我在文档中找到的内容,一些消息丢失了。这是代码: from kombu import Connect
尝试让它工作:Django 1.6.11 + Celery 3.1.19 + Kombu 3.0.33 在 Ubuntu 14.04 上使用教程 http://docs.celeryproject.o
我正在将 Django-kombu 与 Celery 一起使用,但在很多地方都读到它还没有准备好生产。 基本上,我想使用 Celery 创建一个多主-多从架构,并在它们之间传递消息,然后返回到执行调用
我正在使用 Redis 上的 Kombu 组合生产者/消费者设置,但我遇到了问题。如果我启动一个消费者,然后使用 range(10000) 启动生产者,我可以确认生产者已将所有 10k 项目排队,但并
问题 我有一个 RabbitMQ Server作为我的一个系统的队列中心。在过去一周左右的时间里,它的生产商每隔几个小时就会完全停止生产。 我尝试了什么 蛮力 停止消费者会释放锁几分钟,但随后会阻止返
我想将图像上传到 S3 服务器,但在上传之前我想生成 3 种不同大小的缩略图,我希望它在请求/响应周期之外完成,因此我使用的是 celery。我已经阅读了文档,这是我的理解。如果我错了,请纠正我。 C
我用 django 做了一个网站,我用 celery 做异步任务,当我运行时: ./manage.py runserver 我得到了错误: Traceback (most recent call la
基于 puckel 的 Airflow v1.9.0 docker 部署因为这个错误而让我崩溃: Traceback (most recent call last): File "/usr/loc
基于 puckel 的 Airflow v1.9.0 docker 部署因为这个错误而让我崩溃: Traceback (most recent call last): File "/usr/loc
documentation建议根据 AMQP 规范将额外的 AMQP 属性作为关键字参数传递给 publish,但 correlationId="foo" 似乎没有达到预期的效果。 最佳答案 如果你
我正在用django运行一个应用程序,我想用celery做一些定时任务。 根据oficial docs ,在我的 settings.py 文件中我设置了代理传输 BROKER_URL = 'djang
我有一个 RabbitMQ 交换器和一个队列。我希望创建一个运行多个线程并尽快处理此队列的守护进程。 “工作”涉及与外部服务的通信,因此每个消费者内部都会发生相当多的阻塞。因此,我希望有多个线程都处理
我正在使用带有 fastAPI 的 celery 。 获取 无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任的内容 (application/json)'
编辑: 主要问题是第 3 方 rabbitmq 机器似乎时不时地终止空闲连接。那是我开始收到“Broken Pipe”异常的时候。获得通讯的唯一途径。恢复正常对我来说是终止进程并重新启动它们。我认为有
我按照第一步使用 Celery (Django) 并尝试在后台运行繁重的进程。我安装了 RabbitMQ 服务器。然而,当我尝试时,celery -A my_app worker -l info 它抛
我有 django 1.11.5 应用程序和 celery 4.1.0,我一直收到: kombu.exceptions.EncodeError: is not JSON serializable 我
如果我运行以下代码,回调(测试)传递给 consumer永远不会被触发。 但是,如果我留意 rabbitmq GUI,我会看到消息已被检索(但未确认)。所以看起来消费者正在收到消息,但没有将其传递给我
我正在尝试使用 Celery 和 RabbitMQ 异步发送电子邮件。这是我第一次使用 Celery,所以我对一些错误不是很熟悉。我意识到回溯来自一个名为 kombu 的包,我知道它是 Celery
我是一名优秀的程序员,十分优秀!