gpt4 book ai didi

python - 如何使用 Python 检查 Celery/Supervisor 是否正在运行

转载 作者:IT老高 更新时间:2023-10-28 21:02:26 24 4
gpt4 key购买 nike

如果 celery 在机器(Ubuntu)上运行,如何用 Python 编写脚本输出?

我的用例。我有一个包含一些任务的简单 python 文件。我没有使用 Django 或 Flask。我使用主管来运行任务队列。例如,

tasks.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
return a + b

主管:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

这一切都有效,我现在想要有一个页面来检查 celery/supervisor 进程是否正在运行。即这样的事情可能使用 Flask 允许我托管页面提供 200 状态允许我进行负载平衡。

例如...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

#check supervisor is running
if supervisor:
return render_template('up.html')
else:
return render_template('down.html')

if __name__ == '__main__':
app.run()

最佳答案

更新 09/2020:Jérôme 在此处更新了 Celery 4.3 的此答案:https://stackoverflow.com/a/57628025/1159735

您可以通过导入celery.bin.celery包通过代码运行celery status命令:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
try:
status.run()
return True
except celery.bin.base.Error as e:
if e.status == celery.platforms.EX_UNAVAILABLE:
return False
raise e

if __name__ == '__main__':
if celery_is_up():
print('Celery up!')
else:
print('Celery not responding...')

关于python - 如何使用 Python 检查 Celery/Supervisor 是否正在运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33446350/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com