- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
一旦从另一个数据源接收到消息,我的 python 脚本就必须不断地向 RabbitMQ 发送消息。 python 脚本发送它们的频率可能会有所不同,例如 1 分钟 - 30 分钟。
以下是我如何建立与 RabbitMQ 的连接:
rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
channel = rbt_conn.channel()
我刚遇到一个异常
pika.exceptions.ConnectionClosed
我怎样才能重新连接到它?最好的方法是什么?有什么“攻略”吗?是否可以发送 ping 以保持连接有效或设置超时?
任何指针将不胜感激。
最佳答案
RabbitMQ 使用心跳 来检测和关闭“死”连接并防止网络设备(防火墙等)终止“空闲”连接。从 3.5.5 版开始,默认超时设置为 60 秒(之前为 ~10 分钟)。来自docs :
Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable.
Pika 的 BlockingConnection 的问题在于它无法响应心跳,直到进行某些 API 调用(例如,channel.basic_publish()
、 connection.sleep()
等)。
目前我发现的方法:
RabbitMQ 在建立连接时与客户端协商超时时间。理论上,应该可以使用 heartbeat_interval
参数以更大的值覆盖服务器默认值,但当前的 Pika 版本 (0.10.0) 使用 min 值在服务器和客户端提供的那些之间。此问题已在当前 master 上修复.
另一方面,可以通过将 heartbeat_interval
参数设置为 0
来完全停用心跳功能,这很可能会让您陷入新的问题(防火墙断开连接等)
扩展@itsafire 的回答,您可以编写自己的publisher 类,让您在需要时重新连接。一个简单的实现示例:
import logging
import json
import pika
class Publisher:
EXCHANGE='my_exchange'
TYPE='topic'
ROUTING_KEY = 'some_routing_key'
def __init__(self, host, virtual_host, username, password):
self._params = pika.connection.ConnectionParameters(
host=host,
virtual_host=virtual_host,
credentials=pika.credentials.PlainCredentials(username, password))
self._conn = None
self._channel = None
def connect(self):
if not self._conn or self._conn.is_closed:
self._conn = pika.BlockingConnection(self._params)
self._channel = self._conn.channel()
self._channel.exchange_declare(exchange=self.EXCHANGE,
type=self.TYPE)
def _publish(self, msg):
self._channel.basic_publish(exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
body=json.dumps(msg).encode())
logging.debug('message sent: %s', msg)
def publish(self, msg):
"""Publish msg, reconnecting if necessary."""
try:
self._publish(msg)
except pika.exceptions.ConnectionClosed:
logging.debug('reconnecting to queue')
self.connect()
self._publish(msg)
def close(self):
if self._conn and self._conn.is_open:
logging.debug('closing queue connection')
self._conn.close()
我尚未探索的其他可能性:
connection.sleep()
以响应服务器心跳。关于python - 如何重新连接到 RabbitMQ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35193335/
我正在开发一个 voip 调用应用程序。我需要做的是在接到来电时将 Activity 带到前台。我在应用程序中使用 Twilio,并在收到推送消息时开始调用。 问题是我试图在接到任何电话时显示 Act
我是一名优秀的程序员,十分优秀!