- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
编辑:
主要问题是第 3 方 rabbitmq 机器似乎时不时地终止空闲连接。那是我开始收到“Broken Pipe”异常的时候。获得通讯的唯一途径。恢复正常对我来说是终止进程并重新启动它们。我认为有更好的方法吗?
--
我有点迷路了。我正在连接到第 3 方 RabbitMQ 服务器以将消息推送到。时不时地,他们机器上的所有套接字都会掉线,我最终会收到“Broken Pipe”异常。
我被告知要在我的代码中实现心跳检查,但我不确定具体如何实现。我在这里找到了一些信息:http://kombu.readthedocs.org/en/latest/changelog.html#version-2-3-0但没有真正的示例代码。
我只需要在连接字符串中添加“?heartbeat=x”吗? Kombu 会做剩下的事情吗?我看到我需要在“x/2”调用“Connection.heartbeat_check()”。我应该创建一个周期性任务来调用它吗?如何重新建立连接?
我正在使用:
我的代码现在看起来像这样。一个简单的 Celery 任务被调用以将消息发送到第 3 方 RabbitMQ 服务器(删除了日志记录和评论以使其简短,足够基本):
class SendMessageTask(Task):
name = "campaign.backends.send"
routing_key = "campaign.backends.send"
ignore_result = True
default_retry_delay = 60 # 1 minute.
max_retries = 5
def run(self, send_to, message, **kwargs):
payload = "Testing message"
try:
conn = BrokerConnection(
hostname=HOSTNAME,
port=PORT,
userid=USER_ID,
password=PASSWORD,
virtual_host=VHOST
)
with producers[conn].acquire(block=True) as producer:
publish = conn.ensure(producer, producer.publish, errback=sending_errback, max_retries=3)
publish(
body=payload,
routing_key=OUT_ROUTING_KEY,
delivery_mode=2,
exchange=EXCHANGE,
serializer=None,
content_type='text/xml',
content_encoding = 'utf-8'
)
except Exception, ex:
print ex
感谢所有帮助。
最佳答案
虽然您当然可以为生产者添加心跳支持,但它对消费者进程更有意义。
启用心跳意味着你必须定期发送心跳,例如如果心跳设置为 1 秒,则必须每秒或更长时间发送一次心跳,否则远程将关闭连接。
这意味着您必须使用单独的线程或使用异步 io 来及时可靠地发送心跳,并且由于无法在线程之间共享连接,因此我们只能使用异步 io。
好消息是,将心跳添加到纯生产连接可能不会给您带来太多好处。
关于python - 使用 Kombu 设置 Rabbit MQ Heartbeat,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14581986/
使用 Kombu 和 RabbitMQ 来实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者: from kombu import Connection, Exchange, Queue me
我正在使用kombu通过生产者/消费者模型来管理RabbitMQ。我启动了我的生产者,该生产者将100个作业放在一个队列中(我只有一个队列和一个交换)。我想同时启动多个消费者,并让每个消费者一次处理一
我正在测试昆布的工作原理。我计划在几个项目中替换 pika。我看到 kombu 有很多文档,但是使用我在文档中找到的内容,一些消息丢失了。这是代码: from kombu import Connect
尝试让它工作:Django 1.6.11 + Celery 3.1.19 + Kombu 3.0.33 在 Ubuntu 14.04 上使用教程 http://docs.celeryproject.o
我正在将 Django-kombu 与 Celery 一起使用,但在很多地方都读到它还没有准备好生产。 基本上,我想使用 Celery 创建一个多主-多从架构,并在它们之间传递消息,然后返回到执行调用
我正在使用 Redis 上的 Kombu 组合生产者/消费者设置,但我遇到了问题。如果我启动一个消费者,然后使用 range(10000) 启动生产者,我可以确认生产者已将所有 10k 项目排队,但并
问题 我有一个 RabbitMQ Server作为我的一个系统的队列中心。在过去一周左右的时间里,它的生产商每隔几个小时就会完全停止生产。 我尝试了什么 蛮力 停止消费者会释放锁几分钟,但随后会阻止返
我想将图像上传到 S3 服务器,但在上传之前我想生成 3 种不同大小的缩略图,我希望它在请求/响应周期之外完成,因此我使用的是 celery。我已经阅读了文档,这是我的理解。如果我错了,请纠正我。 C
我用 django 做了一个网站,我用 celery 做异步任务,当我运行时: ./manage.py runserver 我得到了错误: Traceback (most recent call la
基于 puckel 的 Airflow v1.9.0 docker 部署因为这个错误而让我崩溃: Traceback (most recent call last): File "/usr/loc
基于 puckel 的 Airflow v1.9.0 docker 部署因为这个错误而让我崩溃: Traceback (most recent call last): File "/usr/loc
documentation建议根据 AMQP 规范将额外的 AMQP 属性作为关键字参数传递给 publish,但 correlationId="foo" 似乎没有达到预期的效果。 最佳答案 如果你
我正在用django运行一个应用程序,我想用celery做一些定时任务。 根据oficial docs ,在我的 settings.py 文件中我设置了代理传输 BROKER_URL = 'djang
我有一个 RabbitMQ 交换器和一个队列。我希望创建一个运行多个线程并尽快处理此队列的守护进程。 “工作”涉及与外部服务的通信,因此每个消费者内部都会发生相当多的阻塞。因此,我希望有多个线程都处理
我正在使用带有 fastAPI 的 celery 。 获取 无法解码消息正文:ContentDisallowed('拒绝反序列化 json 类型的不受信任的内容 (application/json)'
编辑: 主要问题是第 3 方 rabbitmq 机器似乎时不时地终止空闲连接。那是我开始收到“Broken Pipe”异常的时候。获得通讯的唯一途径。恢复正常对我来说是终止进程并重新启动它们。我认为有
我按照第一步使用 Celery (Django) 并尝试在后台运行繁重的进程。我安装了 RabbitMQ 服务器。然而,当我尝试时,celery -A my_app worker -l info 它抛
我有 django 1.11.5 应用程序和 celery 4.1.0,我一直收到: kombu.exceptions.EncodeError: is not JSON serializable 我
如果我运行以下代码,回调(测试)传递给 consumer永远不会被触发。 但是,如果我留意 rabbitmq GUI,我会看到消息已被检索(但未确认)。所以看起来消费者正在收到消息,但没有将其传递给我
我正在尝试使用 Celery 和 RabbitMQ 异步发送电子邮件。这是我第一次使用 Celery,所以我对一些错误不是很熟悉。我意识到回溯来自一个名为 kombu 的包,我知道它是 Celery
我是一名优秀的程序员,十分优秀!