- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在用 python 编写基于 celerybeat-mongo 的 celerybeat 自定义调度程序类与 mongodb 一起工作的项目。
实际上,我正在尝试使用 couchbase 而不是 mongodb 来完成这项工作。我还编写了一个 ScheduleEntry 类和一个 Scheduler 类,我从 couchbase 文档中获取调度程序列表并将其解析为 ScheduleEntry 对象等...
但是当我按照 link 中的说明运行它时, 似乎什么都没发生
celery -A <my.task.file> beat -S <my.scheduler.CouchBaseScheduler>
我对 celery 还很陌生,我已经运行了一些带有任务的 worker,但我不太清楚调度程序是如何工作的。 Celerybeat 启动良好,我只知道它从数据库中正确读取了我的调度程序,但是尽管我在 tasks.py 文件中指出了任务,但似乎没有调用任何任务。
我的方向正确吗?上面的命令行可以吗?我如何调试它,因为我唯一的运行方式是从命令行(使用子进程从脚本启动它并调试它会很脏)。
编辑:我添加了一些关于此的细节:
首先,我在 tasks.py 文件中写了一个基本任务:
import celery
import os
from datetime import datetime
from celery.utils.log import get_logger
def log_task_info(task_name, process_index, init_date):
# logger.warn(task_name + ': ' + str(process_index) + ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' +
# str(datetime.now()))
get_logger(__name__).warning(task_name + ': ' + str(process_index) +
':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + str(datetime.now()))
@celery.task(name='tasks.heartbeat')
def heartbeat():
log_task_info('heartbeat', os.getpid(), datetime.now())
return "Hello!"
然后我将 Scheduler 和 SchedulerEntry 类子类化。
class CouchBaseScheduler(Scheduler):
UPDATE_INTERVAL = datetime.timedelta(seconds=5)
Entry = CouchBaseScheduleEntry
host = "192.168.59.103"
port = "8091"
bucket = "celery"
doc_string = "scheduler_list"
password = "1234"
scheduleCount = 0
def __init__(self, *args, **kwargs):
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_BUCKET"):
bucket_str = current_app.conf.CELERY_COUCHBASE_SCHEDULER_BUCKET
else:
bucket_str = "celery"
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_URL"):
cnx_string = "{}/{}".format(current_app.conf.CELERY_COUCHBASE_SCHEDULER_URL, bucket_str)
else:
cnx_string = "couchbase://{}:{}/{}".format(self.host, self.port, self.bucket)
try:
self.bucket = Bucket(cnx_string, password=self.password, quiet=True)
self.couchcel = CouchBaseCelery(self.bucket, self.doc_string)
get_logger(__name__).info("backend scheduler using %s", cnx_string)
self._schedule = {}
self._last_updated = None
Scheduler.__init__(self, *args, **kwargs)
self.max_interval = (kwargs.get('max_interval')
or self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 5)
except AuthError:
get_logger(__name__).error("Couchbase connection %s failed : Auth failed!", cnx_string)
except CouchbaseError as cbe:
get_logger(__name__).debug("Couchbase connection %s failed : %s", cnx_string, type(cbe))
def setup_schedule(self):
pass
def requires_update(self):
if not self._last_updated:
return True
return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now()
def get_from_database(self):
self.sync()
try:
get_logger(__name__).info("Getting scheduler list from couchbase.")
couch_scheduler_list = self.couchcel.get_scheduler_list()
return couch_scheduler_list
except Exception as e:
get_logger(__name__).error("Could not get scheduler list from couchbase: {}".format(e))
@property
def schedule(self):
# self.scheduleCount += 1
# get_logger(__name__).info("Scheduling {}".format(self.scheduleCount))
if self.requires_update():
get_logger(__name__).info("Schedule {} requires update".format(self.scheduleCount))
self._schedule = self.get_from_database()
self._last_updated = datetime.datetime.now()
return self.schedule
def sync(self):
for entry in self._schedule.values():
entry.save(self.couchcel)
和
class CouchBaseScheduleEntry(ScheduleEntry):
def __init__(self, taskid, task):
self._task = task
self.app = current_app._get_current_object()
self._id = taskid
get_logger(__name__).info("Task id: {} processing".format(self._id))
try:
if all(k in self._task for k in ('name', 'task', 'enabled')):
self.name = self._task['name']
self.task = self._task['task']
else:
raise Exception("Field name, task or enabled are mandatory!")
self.args = self._task['args']
self.kwargs = self._task['kwargs']
self.options = self._task['options']
if 'interval' in self._task and 'crontab' in self._task:
raise Exception("Cannot define both interval and crontab schedule")
if 'interval' in self._task:
interval = self._task['interval']
if interval['period'] in PERIODS:
self.schedule = self._interval_schedule(interval['period'], interval['every'])
get_logger(__name__).info("Task contains interval")
else:
raise Exception("The value of an interval must be {}".format(PERIODS))
elif 'crontab' in self._task:
crontab = self._task['crontab']
self.schedule = self._crontab_schedule(crontab)
get_logger(__name__).info("Task contains crontab")
else:
raise Exception("You must define interval or crontab schedule")
if self._task['total_run_count'] is None:
self._task['total_run_count'] = 0
self.total_run_count = self._task['total_run_count']
get_logger(__name__).info("Task total run count: {}".format(self.total_run_count))
if not self._task['last_run_at']:
self._task['last_run_at'] = self._default_now()
else:
self._task['last_run_at'] = datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT)
self.last_run_at = self._task['last_run_at']
get_logger(__name__).info("Task last run at: {}".format(self.last_run_at))
except KeyError as ke:
print('Key not valid: {}'.format(ke))
def _default_now(self):
return self.app.now()
def next(self):
self._task['last_run_at'] = self.app.now()
self._task['total_run_count'] += 1
self._task['run_immediately'] = False
get_logger(__name__).info("NEXT!")
return self.__class__(self._task)
__next__ = next
def is_due(self):
if not self._task['enabled']:
return False, 5.0 # 5 secs delay for reenable
if self._task['run_immediately']:
# figure out when the schedule would run next anyway
_, n = self.schedule.is_due(self.last_run_at)
return True, n
return self.schedule.is_due(self.last_run_at)
def _crontab_schedule(self, crontab):
return celery.schedules.schedule(minute=crontab['minute'],
hour=crontab['hour'],
day_of_week=crontab['day_of_week'],
day_of_month=crontab['day_of_month'],
month_of_year=crontab['month_of_year'])
def _interval_schedule(self, period, every):
return celery.schedules.schedule(datetime.timedelta(**{period: every}))
def __repr__(self):
return '<CouchBaseScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
self.name, self.task, self.args,
self.kwargs, self.schedule
)
def reserve(self, entry):
new_entry = Scheduler.reserve(self, entry)
return new_entry
@property
def getid(self):
return self._id
@property
def gettaskdict(self):
return self._task
def tojson(self):
return json.dumps(self.tocouchdict())
def save(self, couchcel):
get_logger(__name__).info("Saving task {} in couchbase".format(self._id))
if self.total_run_count > self._task['total_run_count']:
self._task['total_run_count'] = self.total_run_count
get_logger(__name__).error("{}, {}".format(self.last_run_at, self._task['last_run_at']))
try:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > self._task['last_run_at']:
self._task['last_run_at'] = self.last_run_at
except TypeError:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT):
self._task['last_run_at'] = self.last_run_at
self._task['run_immediately']= False
couchcel.save_scheduler(self)
couchcel 对象用于数据库访问,ScheduleEntry 对象解析来自 couchbase 文档的数据。
最好的问候
最佳答案
关于 Celery website 的文档似乎有点误导。如果你看here ,您可以看到命令行选项 -S
设置状态数据库,而不是 worker 的调度程序。
尝试使用 --scheduler
选项运行它:
celery -A <my.task.file> beat --scheduler <my.scheduler.CouchBaseScheduler>
关于python - 如何使用 celerybeat 测试自定义调度程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33215721/
好的,所以我编辑了以下... 只需将以下内容放入我的 custom.css #rt-utility .rt-block {CODE HERE} 但是当我尝试改变... 与 #rt-sideslid
在表格 View 中,我有一个自定义单元格(在界面生成器中高度为 500)。在该单元格中,我有一个 Collection View ,我按 (10,10,10,10) 固定到边缘。但是在 tablev
对于我的无能,我很抱歉,但总的来说,我对 Cocoa、Swift 和面向对象编程还很陌生。我的主要来源是《Cocoa Programming for OS X》(第 5 版),以及 Apple 的充满
我正在使用 meta-tegra 为我的 NVIDIA Jetson Nano 构建自定义图像。我需要 PyTorch,但没有它的配方。我在设备上构建了 PyTorch,并将其打包到设备上的轮子中。现
在 jquery 中使用 $.POST 和 $.GET 时,有没有办法将自定义变量添加到 URL 并发送它们?我尝试了以下方法: $.ajax({type:"POST", url:"file.php?
Traefik 已经默认实现了很多中间件,可以满足大部分我们日常的需求,但是在实际工作中,用户仍然还是有自定义中间件的需求,为解决这个问题,官方推出了一个 Traefik Pilot[1] 的功
我想让我的 CustomTextInputLayout 将 Widget.MaterialComponents.TextInputLayout.OutlinedBox 作为默认样式,无需在 XML 中
我在 ~/.emacs 中有以下自定义函数: (defun xi-rgrep (term) (grep-compute-defaults) (interactive "sSearch Te
我有下表: 考虑到每个月的权重,我的目标是在 5 个月内分散 10,000 个单位。与 10,000 相邻的行是我最好的尝试(我在这上面花了几个小时)。黄色是我所追求的。 我试图用来计算的逻辑如下:计
我的表单中有一个字段,它是文件类型。当用户点击保存图标时,我想自然地将文件上传到服务器并将文件名保存在数据库中。我尝试通过回显文件名来测试它,但它似乎不起作用。另外,如何将文件名添加到数据库中?是在模
我有一个 python 脚本来发送电子邮件,它工作得很好,但问题是当我检查我的电子邮件收件箱时。 我希望该用户名是自定义用户名,而不是整个电子邮件地址。 最佳答案 发件人地址应该使用的格式是: You
我想减小 ggcorrplot 中标记的大小,并减少文本和绘图之间的空间。 library(ggcorrplot) data(mtcars) corr <- round(cor(mtcars), 1)
GTK+ noob 问题在这里: 是否可以自定义 GtkFileChooserButton 或 GtkFileChooserDialog 以删除“位置”部分(左侧)和顶部的“位置”输入框? 我实际上要
我正在尝试在主页上使用 ajax 在 magento 中使用 ajax 显示流行的产品列表,我可以为 5 或“N”个产品执行此操作,但我想要的是将分页工具栏与结果集一起添加. 这是我添加的以显示流行产
我正在尝试使用 PasswordResetForm 内置函数。 由于我想要自定义表单字段,因此我编写了自己的表单: class FpasswordForm(PasswordResetForm):
据我了解,新的 Angular 7 提供了拖放功能。我搜索了有关 DnD 的 Tree 组件,但没有找到与树相关的内容。 我在 Stackblitz 上找到的一个工作示例.对比drag'ndrop功能
我必须开发一个自定义选项卡控件并决定使用 WPF/XAML 创建它,因为我无论如何都打算学习它。完成后应该是这样的: 到目前为止,我取得了很好的进展,但还有两个问题: 只有第一个/最后一个标签项应该有
我要定制xtable用于导出到 LaTeX。我知道有些问题是关于 xtable在这里,但我找不到我要找的具体东西。 以下是我的表的外观示例: my.table <- data.frame(Specif
用ejs在这里显示日期 它给我结果 Tue Feb 02 2016 16:02:24 GMT+0530 (IST) 但是我需要表现为 19th January, 2016 如何在ejs中执行此操作?
我想问在 JavaFX 中使用自定义对象制作 ListView 的最佳方法,我想要一个每个项目如下所示的列表: 我搜了一下,发现大部分人都是用细胞工厂的方法来做的。有没有其他办法?例如使用客户 fxm
我是一名优秀的程序员,十分优秀!