gpt4 book ai didi

python - Pika 心跳终止连接

转载 作者:行者123 更新时间:2023-12-04 17:20:07 26 4
gpt4 key购买 nike

我有一个代码,它只是用 pika 将消息排队到代理的队列中。

class Publisher:

def __init__(self, config):
self._params = ConnectionParameters(
host = config.RABBITMQ_HOST,
credentials = PlainCredentials(config.RABBITMQ_USER, config.RABBITMQ_PASSWORD))
self._conn = None
self._channel = None
self.exchange_name = config.RABBITMQ_AGENT_EXCHANGE


def connect(self):
if not self._conn or self._conn.is_closed:
self._conn = BlockingConnection(self._params)
self._channel = self._conn.channel()
self._channel.exchange_declare(exchange=self.exchange_name, exchange_type = 'topic')

def _publish(self, task):
properties = BasicProperties(expiration=task.expiration_ms)
self._channel.basic_publish(exchange= self.exchange_name,
routing_key = task.routing_key,
properties = properties if task.has_expiration else None,
body=dumps(task, cls = TaskEncoder).encode())
logging.debug('message sent: %s', task)


def publish(self, msg):
"""Publish msg, reconnecting if necessary."""

try:
self._publish(msg)
except ConnectionClosed:
logging.error('reconnecting to queue')
self.connect()
self._publish(msg)
Pika 停止排队消息以与下一条消息建立长期运行的连接,并且不再抛出任何错误
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.heartbeat [INFO] - Connection is idle, 1 stale byte intervals
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)>
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.184.108', 41024), raddr=('10.100.176.158', 5672)
代码使用
publisher = Publisher(config)
publisher.connect()
while True:
publisher.publish(obj)
time.sleep(1)
我有两个问题:
如何预防?在这种情况下,禁用心跳是否可以工作?
如何使用防火墙重现/模拟此行为?我尝试在 RMQ 端口添加丢包规则,但没有成功。
鼠兔版本:1.0.1
RMQ 版本:3.8.9
python :3.8.6

最佳答案

有很多帖子建议保持回调执行很短,所以控制应该回到 pika。
see more on it here
如果您有大的事情要计算,最好将它们跨越到另一个线程/进程,或者将它们累积起来供以后执行。
这个概念帮助我使用其他实时 api。
似乎发生的是兔子正在关闭 channel (如果您的手在其他地方不忙,您可能会在回调中发现它),并且在我看来心跳被鼠兔停止了。

  • 友情链接
  • 关于python - Pika 心跳终止连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66689571/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com