gpt4 book ai didi

python - APScheduler 作业未按计划启动

转载 作者:太空宇宙 更新时间:2023-11-03 14:29:42 25 4
gpt4 key购买 nike

我正在尝试安排一项工作每分钟开始一次。我在 scheduler.py 脚本中定义了调度程序:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}

scheduler = BackgroundScheduler(executors=executors,job_defaults=job_defaults)

我在模块的 __init__.py 中初始化调度程序,如下所示:

from scheduler import scheduler

scheduler.start()

我想针对特定操作启动计划作业,如下所示:

def AddJob():
dbid = repository.database.GetDbid()
job_id = 'CollectData_{0}'.format(dbid)
scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)

def TestScheduler():
for i in range(0,29):
starttime = time()
print "test"
sleep(1.0 - ((time() - starttime) % 1.0))

首先:当我在 python 控制台中执行 AddJob() 函数时,它开始按预期运行,但不是在后台运行,控制台会被阻止,直到 TestScheduler code> 函数在 30 秒后结束。我希望它在后台运行,因为它是一个后台调度程序。
第二:即使指定 1 分钟的重复间隔,作业也不会再次启动。

我错过了什么?

更新

由于另一个线程,我发现了这个问题。错误的行是这样的:

scheduler.scheduled_job(func=TestScheduler(),
trigger='interval',
minutes=1,
id=job_id
)

我把它改为:

scheduler.add_job(func=TestScheduler,
trigger='interval',
minutes=1,
id=job_id
)

TestScheduler() 变为 TestScheduler。使用 TestScheduler() 会导致函数 TestScheduler() 的结果作为 add_job() 的参数传递。

最佳答案

第一个问题似乎是您正在 __init__.py 内初始化调度程序,这似乎不是推荐的方式。
第一次导入特定文件夹中的模块时,将执行 __init__.py 中存在的代码。例如,想象一下这个结构:

my_module
|--__init__.py
|--test.py

__init__.py:

from scheduler import scheduler

scheduler.start()

from my_module import Something时,scheduler.start()命令被执行。因此,它要么根本不从 __init__.py 启动,要么启动多次(取决于代码的其余部分!)。

另一个问题肯定是使用 scheduler.scheduled_job() 方法。如果您阅读文档on adding jobs ,您会发现推荐的方法是使用 add_job() 方法,而不是使用 scheduled_job() ,这是为了方便起见的装饰器。

我建议这样:

  1. 保持 my_scheduler.py 不变。
  2. __init__.py 中删除 scheduler.start() 行。
  3. 按如下方式更改主文件:

    from my_scheduler import scheduler

    if not scheduler.running: # Clause suggested by @CyrilleMODIANO
    scheduler.start()

    def AddJob():
    dbid = repository.database.GetDbid()
    job_id = 'CollectData_{0}'.format(dbid)
    scheduler.add_job(
    func=TestScheduler,
    trigger='interval',
    minutes=1,
    id=job_id
    )

    ...

关于python - APScheduler 作业未按计划启动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47389598/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com