gpt4 book ai didi

python - 如何使用 celerybeat 测试自定义调度程序?

转载 作者:行者123 更新时间:2023-11-28 19:16:14 25 4
gpt4 key购买 nike

我正在用 python 编写基于 celerybeat-mongo 的 celerybeat 自定义调度程序类与 mongodb 一起工作的项目。

实际上,我正在尝试使用 couchbase 而不是 mongodb 来完成这项工作。我还编写了一个 ScheduleEntry 类和一个 Scheduler 类,我从 couchbase 文档中获取调度程序列表并将其解析为 ScheduleEntry 对象等...

但是当我按照 link 中的说明运行它时, 似乎什么都没发生

celery -A <my.task.file> beat -S <my.scheduler.CouchBaseScheduler>

我对 celery 还很陌生,我已经运行了一些带有任务的 worker,但我不太清楚调度程序是如何工作的。 Celerybeat 启动良好,我只知道它从数据库中正确读取了我的调度程序,但是尽管我在 tasks.py 文件中指出了任务,但似乎没有调用任何任务。

我的方向正确吗?上面的命令行可以吗?我如何调试它,因为我唯一的运行方式是从命令行(使用子进程从脚本启动它并调试它会很脏)。

编辑:我添加了一些关于此的细节:

首先,我在 tasks.py 文件中写了一个基本任务:

import celery
import os
from datetime import datetime
from celery.utils.log import get_logger

def log_task_info(task_name, process_index, init_date):
# logger.warn(task_name + ': ' + str(process_index) + ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' +
# str(datetime.now()))
get_logger(__name__).warning(task_name + ': ' + str(process_index) +
':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + str(datetime.now()))

@celery.task(name='tasks.heartbeat')
def heartbeat():
log_task_info('heartbeat', os.getpid(), datetime.now())
return "Hello!"

然后我将 Scheduler 和 SchedulerEntry 类子类化。

class CouchBaseScheduler(Scheduler):

UPDATE_INTERVAL = datetime.timedelta(seconds=5)

Entry = CouchBaseScheduleEntry

host = "192.168.59.103"
port = "8091"
bucket = "celery"
doc_string = "scheduler_list"
password = "1234"
scheduleCount = 0

def __init__(self, *args, **kwargs):
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_BUCKET"):
bucket_str = current_app.conf.CELERY_COUCHBASE_SCHEDULER_BUCKET
else:
bucket_str = "celery"
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_URL"):
cnx_string = "{}/{}".format(current_app.conf.CELERY_COUCHBASE_SCHEDULER_URL, bucket_str)
else:
cnx_string = "couchbase://{}:{}/{}".format(self.host, self.port, self.bucket)

try:
self.bucket = Bucket(cnx_string, password=self.password, quiet=True)
self.couchcel = CouchBaseCelery(self.bucket, self.doc_string)
get_logger(__name__).info("backend scheduler using %s", cnx_string)
self._schedule = {}
self._last_updated = None
Scheduler.__init__(self, *args, **kwargs)
self.max_interval = (kwargs.get('max_interval')
or self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 5)
except AuthError:
get_logger(__name__).error("Couchbase connection %s failed : Auth failed!", cnx_string)
except CouchbaseError as cbe:
get_logger(__name__).debug("Couchbase connection %s failed : %s", cnx_string, type(cbe))


def setup_schedule(self):
pass

def requires_update(self):
if not self._last_updated:
return True
return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now()

def get_from_database(self):
self.sync()
try:
get_logger(__name__).info("Getting scheduler list from couchbase.")
couch_scheduler_list = self.couchcel.get_scheduler_list()
return couch_scheduler_list
except Exception as e:
get_logger(__name__).error("Could not get scheduler list from couchbase: {}".format(e))

@property
def schedule(self):
# self.scheduleCount += 1
# get_logger(__name__).info("Scheduling {}".format(self.scheduleCount))
if self.requires_update():
get_logger(__name__).info("Schedule {} requires update".format(self.scheduleCount))
self._schedule = self.get_from_database()
self._last_updated = datetime.datetime.now()
return self.schedule

def sync(self):
for entry in self._schedule.values():
entry.save(self.couchcel)

class CouchBaseScheduleEntry(ScheduleEntry):

def __init__(self, taskid, task):
self._task = task

self.app = current_app._get_current_object()
self._id = taskid
get_logger(__name__).info("Task id: {} processing".format(self._id))
try:
if all(k in self._task for k in ('name', 'task', 'enabled')):
self.name = self._task['name']
self.task = self._task['task']
else:
raise Exception("Field name, task or enabled are mandatory!")

self.args = self._task['args']
self.kwargs = self._task['kwargs']
self.options = self._task['options']

if 'interval' in self._task and 'crontab' in self._task:
raise Exception("Cannot define both interval and crontab schedule")
if 'interval' in self._task:
interval = self._task['interval']
if interval['period'] in PERIODS:
self.schedule = self._interval_schedule(interval['period'], interval['every'])
get_logger(__name__).info("Task contains interval")
else:
raise Exception("The value of an interval must be {}".format(PERIODS))
elif 'crontab' in self._task:
crontab = self._task['crontab']
self.schedule = self._crontab_schedule(crontab)
get_logger(__name__).info("Task contains crontab")
else:
raise Exception("You must define interval or crontab schedule")

if self._task['total_run_count'] is None:
self._task['total_run_count'] = 0
self.total_run_count = self._task['total_run_count']
get_logger(__name__).info("Task total run count: {}".format(self.total_run_count))

if not self._task['last_run_at']:
self._task['last_run_at'] = self._default_now()
else:
self._task['last_run_at'] = datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT)
self.last_run_at = self._task['last_run_at']
get_logger(__name__).info("Task last run at: {}".format(self.last_run_at))
except KeyError as ke:
print('Key not valid: {}'.format(ke))

def _default_now(self):
return self.app.now()

def next(self):
self._task['last_run_at'] = self.app.now()
self._task['total_run_count'] += 1
self._task['run_immediately'] = False
get_logger(__name__).info("NEXT!")
return self.__class__(self._task)

__next__ = next

def is_due(self):
if not self._task['enabled']:
return False, 5.0 # 5 secs delay for reenable
if self._task['run_immediately']:
# figure out when the schedule would run next anyway
_, n = self.schedule.is_due(self.last_run_at)
return True, n
return self.schedule.is_due(self.last_run_at)

def _crontab_schedule(self, crontab):
return celery.schedules.schedule(minute=crontab['minute'],
hour=crontab['hour'],
day_of_week=crontab['day_of_week'],
day_of_month=crontab['day_of_month'],
month_of_year=crontab['month_of_year'])

def _interval_schedule(self, period, every):
return celery.schedules.schedule(datetime.timedelta(**{period: every}))


def __repr__(self):
return '<CouchBaseScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
self.name, self.task, self.args,
self.kwargs, self.schedule
)

def reserve(self, entry):
new_entry = Scheduler.reserve(self, entry)
return new_entry

@property
def getid(self):
return self._id

@property
def gettaskdict(self):
return self._task

def tojson(self):
return json.dumps(self.tocouchdict())

def save(self, couchcel):
get_logger(__name__).info("Saving task {} in couchbase".format(self._id))
if self.total_run_count > self._task['total_run_count']:
self._task['total_run_count'] = self.total_run_count
get_logger(__name__).error("{}, {}".format(self.last_run_at, self._task['last_run_at']))
try:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > self._task['last_run_at']:
self._task['last_run_at'] = self.last_run_at

except TypeError:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT):
self._task['last_run_at'] = self.last_run_at
self._task['run_immediately']= False
couchcel.save_scheduler(self)

couchcel 对象用于数据库访问,ScheduleEntry 对象解析来自 couchbase 文档的数据。

最好的问候

最佳答案

关于 Celery website 的文档似乎有点误导。如果你看here ,您可以看到命令行选项 -S 设置状态数据库,而不是 worker 的调度程序。

尝试使用 --scheduler 选项运行它:

celery -A <my.task.file> beat --scheduler <my.scheduler.CouchBaseScheduler>

关于python - 如何使用 celerybeat 测试自定义调度程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33215721/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com