gpt4 book ai didi

python - 每个 celery worker 的定期任务

转载 作者:行者123 更新时间:2023-12-01 07:28:07 27 4
gpt4 key购买 nike

我正在寻找一种方法来对每个 celery worker 执行特定类型的任务。确切的用例是定期运行状况作业,确保成功任务初始化和进展的各种先决条件(这些指标报告给不同的服务)。例如,确保可以建立与数据库的连接。

我发现远程控制和检查命令可用于此目的(通过一些固定的调度),但当 AWS SQS 用作后端代理时,它们不受支持。

知道如何在不向 fork 流程任务添加任何内存占用的情况下实现这一目标吗?也许通过在工作进程中启动另一个线程?

最佳答案

为了解决这个问题,我使用了 Celery custom worker bootstep 。它在启动时注册一个计划任务,每 X 秒将一个健康检查任务忽略到工作执行池中。

该解决方案与后端代理无关,并利用定制的工作线程执行池。

class WorkerHealthMonitor(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer',
'celery.worker.components:Pool'}

def __init__(self, worker, **kwargs):
self.tref = None
self.interval = 60

def start(self, worker):
logger.info("Registering health monitor timer with %d seconds interval", self.interval)
self.tref = worker.timer.call_repeatedly(
self.interval, schedule_health_check, (worker,), priority=10,
)

def stop(self, worker):
if self.tref:
self.tref.cancel()
self.tref = None


def schedule_health_check(worker):
worker.pool.apply_async(health_check, callback=health_check_completed)


def health_check(**kwargs):
logger.info('Running Health Check...')
return 'I am alive'


def health_check_completed(result):
logger.info('Health check completed with msg: %s', result)

任务注册:

app = Celery('tasks', broker=BROKER_URL, backend=BACKEND_URL)
app.steps['worker'].add(WorkerHealthMonitor)

关于python - 每个 celery worker 的定期任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57346058/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com