- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章python 基于Apscheduler实现定时任务由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
在工作场景遇到了这么一个场景,就是需要定期去执行一个缓存接口,用于同步设备配置。首先想到的就是Linux上的crontab,可以定期,或者间隔一段时间去执行任务。但是如果你想要把这个定时任务作为一个模块集成到Python项目中,或者想持久化任务,显然crontab不太适用。Python的APScheduler模块能够很好的解决此类问题,所以专门写这篇文章,从简单入门开始记录关于APScheduler最基础的使用场景,以及解决持久化任务的问题,最后结合其他框架深层次定制定时任务模块这几个点入手.
先简单介绍一下Apscheduler模块包含的四种组件:
大概了解了Apscheduler包含的几种概念,现在先来看一下一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# -*- coding: utf-8 -*-
from
apscheduler.schedulers.blocking
import
BlockingScheduler
import
time
def
hello():
print
(time.strftime(
"%c"
))
if
__name__
=
=
"__main__"
:
scheduler
=
BlockingScheduler()
scheduler.add_job(hello,
'interval'
, seconds
=
5
)
scheduler.start()
|
示例的输出:
1
2
3
4
5
6
|
Thu Dec 3 16:01:20 2020
Thu Dec 3 16:01:25 2020
Thu Dec 3 16:01:30 2020
Thu Dec 3 16:01:35 2020
Thu Dec 3 16:01:40 2020
..........
|
这个简单的示例,我们用上面提到几种组件分析一下运行逻辑:
内置的三种Trigger触发器类型:
常见的Scheduler调度器
常见的JobStore:
通过上面一个简单的示例了解大概的工作流程,以及各个组件在整个流程中的作用,以下的示例是Flask Web框架结合使用Apscheduler定时器,定时执行任务.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# -*- coding: utf-8 -*-
from
flask
import
Flask, Blueprint, request
from
apscheduler.executors.pool
import
ThreadPoolExecutor
from
apscheduler.schedulers.background
import
BackgroundScheduler
from
apscheduler.jobstores.redis
import
RedisJobStore
import
time
app
=
Flask(__name__)
executors
=
{
"default"
: ThreadPoolExecutor(
5
)}
default_redis_jobstore
=
RedisJobStore(db
=
2
,
jobs_key
=
"apschedulers.default_jobs"
,
run_times_key
=
"apschedulers.default_run_times"
,
host
=
'127.0.0.1'
,
port
=
6379
)
scheduler
=
BackgroundScheduler(executors
=
executors)
scheduler.add_jobstore(default_redis_jobstore)
scheduler.start()
def
say_hello():
print
(time.strftime(
"%c"
))
@app
.route(
"/get_job"
, methods
=
[
'GET'
])
def
get_job():
if
scheduler.get_job(
"say_hello_test"
):
return
"YES"
else
:
return
"NO"
@app
.route(
"/start_job"
, methods
=
[
"GET"
])
def
start_job():
if
not
scheduler.get_job(
"say_hello_test"
):
scheduler.add_job(say_hello,
"interval"
, seconds
=
5
,
id
=
"say_hello_test"
)
return
"Start Scuessfully!"
else
:
return
"Started Failed"
@app
.route(
"/remove_job"
, methods
=
[
"GET"
])
def
remove_job():
if
scheduler.get_job(
"say_hello_test"
):
scheduler.remove_job(
"say_hello_test"
)
return
"Delete Successfully!"
else
:
return
"Delete Failed"
if
__name__
=
=
"__main__"
:
app.run(host
=
"127.0.0.1"
, port
=
8787
, debug
=
True
)
|
最后总结一下,首先你要设置一个作业存储器用于在调度程序崩溃重新恢复时,还能够在作业存储器中获取到作业继续执行;然后你需要设置一个执行器,这个根据作业的类型,比如时一个CPU密集型的任务,那就可以用进程池执行器,默认是用线程池执行器;最后创建配置调度器,启动调度,可以在启动前添加作业,也可以在启动后添加,删除,获取作业。(在这里需要明白的一点就是应用程序不会直接去操作作业存储器,作业或者执行器,而是调度器提供适当的接口来处理这些接口。) 。
ApScheduler是一个不错的定时任务库,能够动态的添加删除,同时也支持不同的触发器类型,这也是它的优势,相反一些如果是静态任务,其实可以用如linux的crontab工具去做定时任务。有关这方面的记录还会持续更新,如果有什么问题,可以提出来,大家一起探讨.
以上就是python Apscheduler的使用方法的详细内容,更多关于python Apscheduler的资料请关注我其它相关文章! 。
原文链接:https://www.cnblogs.com/linshukai/p/14097079.html 。
最后此篇关于python 基于Apscheduler实现定时任务的文章就讲到这里了,如果你想了解更多关于python 基于Apscheduler实现定时任务的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我有一些经常运行或持续一段时间的重复性工作。 似乎 Scheduler().get_jobs() 只会返回当前未运行的计划作业列表,因此我无法确定具有特定 id 的作业是否不存在或实际正在运行。 在这
我不知道为什么会收到此错误:ImportError: No module named 'apscheduler'。 我尝试安装旧版本:sudo pip uninstall apscheduler 然后
我有一个正在运行的 Django 项目及其数据库和模型表。现在,我想使用 Django 数据库连接集成 APScheduler 并将 APScheduler 作业保存在与其他表相同的数据库中。 我已阅
我是 APScheduler 的新手,并在在更大范围内实现之前对其进行了测试。 我已经创建了下面的代码,但是当我将 add_job 与 trigger='date' 一起使用时,我的函数从未被调用过。
我有一个使用 uwsgi(有 10 个 worker )+ ngnix 运行的 django 应用程序。我正在使用 apscheduler 进行调度。每当我安排一项工作时,它都会被多次执行。从这些答案
调度程序在生产中运行良好,然后突然关闭。显然,DB 可能已经离线了一段时间(网络应用程序从未错过任何一个节拍,因此它是短暂的)。 日志报告... [2019-11-25 07:59:14,907: I
我正在使用 APScheduler(3.5.3) 来运行三个不同的作业。我需要在完成第一份工作后立即触发第二份工作。另外我不知道第一份工作的完成时间。我将触发器类型设置为 cron 并计划每 2 小时
我们正在运行 Azure Kubernetes 集群(DEV、TST、PRD 上每个阶段一个),我们需要定期运行多个 Python 脚本,这就是我们使用 APScheduler (3.6.0) 的目的
目前,我正在使用 APScheduler 每 10 分钟自动执行一次脚本。它在 print("[+] Success! Bot Starting!") 之后停止执行,它不会输出错误。我建议我的 Sch
我在程序中使用 APScheduler 在特定日期时间安排作业。但是,我正在处理与本地时间不同时区的日期时间。所以我传递给 APScheduler 的日期时间始终是一个时区感知的日期时间... sch
我们需要在当前作业执行时动态调度多个作业。 大概的场景是: 应该有一个调度程序来每天检查应用程序表(假设在世界标准时间上午 6 点)。查找今天的日期时间为resume_dttime的用户为该用户动态安
我尝试了APS的MongoJobStore example。示例代码为: import logging import os import sys from datetime import dateti
我在脚本中使用模块APScheduler,我使用BlockingScheduler。我有一些定期的工作。如果此作业引发一个异常,无论我在try中期望它还是让它传播,我的线程都会这样做不返回。然后我达到
我希望每天早上 6 点启动我的主要任务。但出于测试目的,我将间隔设置为 5 秒。问题是它似乎永远不会开火。我在 maintask 方法中有一个从未到达的断点,并且没有任何内容打印到控制台。我假设它没有
我通过 apscheduler 安排了作业。到目前为止,我有 3 份工作,但很快就会有更多。我正在寻找一种方法来扩展我的代码。 目前,每个作业都是它自己的.py文件,在该文件中,我将脚本变成了一个以r
我在 django 中运行了 apscheduler,它似乎可以正常工作……好吧。在我的项目 init.py 中,我初始化了调度程序: scheduler = Scheduler(daemon=Tru
我正在尝试为 Github 上的一个项目做出贡献,以收集财务信息 数据。 代码.. # time_keeper.py from apscheduler.scheduler import
[Python 3.5.2, APScheduler 3.3.1] APScheduler 启动了一些线程,我想知道为什么。 这是我正在执行的代码(在 PyCharm 中,我还可以在其中绘制线程图):
我当前正在尝试设置调度程序(使用 apscheduler),但在添加作业时失败: from apscheduler.schedulers.blocking import BlockingSchedul
我正在 redis 中添加作业,完成作业后我添加了一个事件处理程序。在事件处理程序中,我返回值,根据该值我从作业库中删除作业 ID。它被成功删除,但立即抛出异常。 代码 from datetime i
我是一名优秀的程序员,十分优秀!