gpt4 book ai didi

python - APScheduler 如何在调度程序之外添加作业?

转载 作者:太空狗 更新时间:2023-10-29 21:58:22 26 4
gpt4 key购买 nike

我有一个简单的要求。我将 apscheduler 作为一个单独的进程运行。我有另一个 jobproducer 脚本,我想从中向调度程序添加作业并运行它。

这是我的调度程序代码,

# appsched.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.start()

这是我的工作制作人脚本,

# jobproducer.py
from appsched import scheduler

def say_hello_job():
print "Hello"

scheduler.add_job(say_hello_job, 'interval', minutes=1)

不用说,这行不通。有没有办法通过使用工作商店来完成这项工作? 如何与多个不同的作业生产者共享一个调度器?

最佳答案

我有一个类似的问题,我的调度程序进程是 uWSGI MULE过程,并且有一个单独的应用程序,我想在其中添加新工作。

查看 BaseScheduleradd_job() 函数:

with self._jobstores_lock:
if not self.running:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing, True)

您可以看到问题所在:调度程序仅在它已经启动时才添加作业。

幸运的是,解决方案非常简单,我们应该定义我们自己的“add-job-only”调度器:

class JobAddScheduler(BlockingScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined,
coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default',
executor='default', replace_existing=False, **trigger_args):

job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined)
job = Job(self, **job_kwargs)

# Add jobs to job store
with self._jobstores_lock:
self._real_add_job(job, jobstore, replace_existing, True)

return job

def start(self):
pass

def shutdown(self, wait=True):
pass

def _main_loop(self):
pass

def wakeup(self):
pass

然后我们可以立即添加 cron 作业:

jobscheduler = JobAddScheduler()
jobscheduler.add_job(...)

不要忘记配置调度程序!在我的例子中,我使用 SQLAlchemy-MySQL 后端来存储作业:

jobstores=dict(default=SQLAlchemyJobStore(url='mysql+pymsql://USER:PASSWORD@SERVER/DATABASE'))
jobscheduler.configure(jobstores=jobstores)

我不确定其他作业商店,但在我添加新作业后,我不得不调用单独调度程序进程的 wakeup() 函数来“激活”该作业。我使用 uWSGI 的信号系统实现了这一点。

关于python - APScheduler 如何在调度程序之外添加作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32373754/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com