gpt4 book ai didi

python - celery worker 与经纪人断开连接

转载 作者:行者123 更新时间:2023-12-05 06:23:25 24 4
gpt4 key购买 nike

我正在使用 PythonRabbitMQCelery 将任务分配给工作人员。每个任务大约需要 15 分钟,并且 99% 受 CPU 限制。我的系统有 24 个内核,每当我的工作人员执行此任务时,我都会收到与代理的连接错误。

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
[...]
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

我发现了其他几个关于这个问题的帖子,但都没有修复它。尤其是在 CPU 负载很重的情况下,知道我该如何解决吗?

Windows 10 (worker)

macOS 10.14 (RabbitMQ Server)

Python 3.7

Celery 4.3.0 (rhubarb)

RabbitMQ 3.7.16 (Erlang 22.0.7)

我的配置让 worker 一次只消耗 1 个任务,即使 worker 进程在每个作业后重新启动,仍然没有运气:

CELERYD_MAX_TASKS_PER_CHILD = 1,
CELERYD_CONCURRENCY = 1,
CELERY_TASK_RESULT_EXPIRES=3600,
CELERYD_PREFETCH_MULTIPLIER = 1,
CELERY_MAX_CACHED_RESULTS = 1,
CELERY_ACKS_LATE = True,

这是整个调用栈:

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 318, in start
blueprint.start(self)
File "C:\Python37\lib\site-packages\celery\bootsteps.py", line 119, in start
step.start(parent)
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 596, in start
c.loop(*c.loop_args())
File "C:\Python37\lib\site-packages\celery\worker\loops.py", line 118, in synloop
qos.update()
File "C:\Python37\lib\site-packages\kombu\common.py", line 442, in update
return self.set(self.value)
File "C:\Python37\lib\site-packages\kombu\common.py", line 435, in set
self.callback(prefetch_count=new_value)
File "C:\Python37\lib\site-packages\celery\worker\consumer\tasks.py", line 47, in set_prefetch_count
apply_global=qos_global,
File "C:\Python37\lib\site-packages\kombu\messaging.py", line 558, in qos
apply_global)
File "C:\Python37\lib\site-packages\amqp\channel.py", line 1853, in basic_qos
wait=spec.Basic.QosOk,
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 68, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 88, in wait
self.connection.drain_events(timeout=timeout)
File "C:\Python37\lib\site-packages\amqp\connection.py", line 504, in drain_events
while not self.blocking_read(timeout):
File "C:\Python37\lib\site-packages\amqp\connection.py", line 509, in blocking_read
frame = self.transport.read_frame()
File "C:\Python37\lib\site-packages\amqp\transport.py", line 252, in read_frame
frame_header = read(7, True)
File "C:\Python37\lib\site-packages\amqp\transport.py", line 438, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

最佳答案

我找到了解决这个问题的方法。我觉得问题出在 celery 后端。就我而言,我使用的是 Redis。

以下是我的配置

Broker - rabbitmq
Backend - redis
Python - 3.7
OS - Windows 10

在 celery 客户端,我尝试每 60 秒从客户端 ping 一次 worker 的 celery 状态。在这种情况下,我没有遇到连接重置问题。

while not doors_res.ready():
sleep(60)
result = app.get()

其中 app 是 celery 实例。

在 celery worker 这边

celery worker -A <celery_file_name> -l info -P gevent

我的任务运行了大约 2 个小时,我没有遇到连接重置错误。

关于python - celery worker 与经纪人断开连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58356264/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com