gpt4 book ai didi

rabbitmq - Celery RabbitMQ 代理故障转移连接问题

转载 作者:行者123 更新时间:2023-12-02 03:05:36 25 4
gpt4 key购买 nike

我在 HA 模式的集群中有 3 个 RabbitMQ 节点。每个节点都在单独的 Docker 容器上。

我正在使用 Celery 版本 4 和 kombu 版本 4。

我已使用此命令来设置 HA 策略:

rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Celery 配置如下所示:
CELERY = dict(
broker_url=[
'amqp://guest@rabbitmq1:5672',
'amqp://guest@rabbitmq2:5672',
'amqp://guest@rabbitmq3:5672',
],
celery_queue_ha_policy='all',
...
)

一切正常,直到我停止主 RabbitMQ 应用程序以使用以下命令测试 Celery 故障转移功能:
rabbitmqctl stop_app

在 RabbitMQ 应用程序停止后,我立即开始在下面的日志中看到错误。日志消息的频率非常高,并且不会随着尝试次数的增加而减慢。

根据日志,Celery 尝试使用下一次故障转移重新连接,但它被另一次重新连接到已停止的主节点的尝试中断。同样的事情在无限循环中一遍又一遍地发生。
[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:28,302: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:28,303: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:28,303: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:28,303: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:28,318: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:28,470: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
self.sync(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
channel = self.channel
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
self.connection
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
self.sock.connect(sa)
File "/usr/local/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:28,508: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:28,570: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:28,572: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:28,575: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:28,648: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:28,672: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:28,673: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:28,947: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,345: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,506: INFO/MainProcess] Connected to amqp://guest:**@rabbitmq2:5672//
[2017-03-17 15:10:29,535: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,569: DEBUG/MainProcess] | Consumer: Starting Events
[2017-03-17 15:10:29,682: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,740: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,768: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,770: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:29,770: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:29,771: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:29,795: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:29,874: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
self.sync(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
channel = self.channel
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
self.connection
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
self.sock.connect(sa)
File "/usr/local/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:29,887: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:29,907: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:29,909: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:29,910: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:29,911: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:29,912: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:29,953: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:29,954: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:30,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

不幸的是,Celery 文档并没有对故障转移主题进行太多说明。

最佳答案

它绝对是错误,我在 GitHub 上创建了问题:https://github.com/celery/celery/issues/3921

感谢 George Psarakis,我设法使用 --without-mingle 避免了错误。 celery worker 的标志,例如:

celery worker -A app.tasks -l debug --without-mingle

关于rabbitmq - Celery RabbitMQ 代理故障转移连接问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42996655/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com