- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章celery4+django2定时任务的实现代码由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
网上有很多celery + django实现定时任务的教程,不过它们大多数是基于djcelery + celery3的; 。
或者是使用django_celery_beat配置较为繁琐的.
显然简洁而高效才是我们最终的追求,而celery4已经不需要额外插件即可与django结合实现定时任务了,原生的celery beat就可以很好的实现定时任务功能.
当然使用原生方案的同时有几点插件所带来的好处被我们放弃了:
Celery定时任务配置 。
在进行配置前先来看看项目结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
.
├── linux_news
│ ├── celery.py
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── manage.py
├── news
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ ├── models
│ ├── tasks.py
│ ├── tests.py
│ └── views
└── start-celery.sh
|
其中news是我们的app,用于从一些rss订阅源获取新闻信息,linux_news则是我们的project。我们需要关心的主要是 celery.py , settings.py , tasks.py 和 start-celery.sh .
首先是celery.py,想让celery执行任务就必须实例化一个celery app,并把settings.py里的配置传入app:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import
os
from
celery
import
Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault(
'DJANGO_SETTINGS_MODULE'
,
'linux_news.settings'
)
app
=
Celery(
'linux_news'
)
# 'django.conf:settings'表示django,conf.settings也就是django项目的配置,celery会根据前面设置的环境变量自动查找并导入
# - namespace表示在settings.py中celery配置项的名字的统一前缀,这里是'CELERY_',配置项的名字也需要大写
app.config_from_object(
'django.conf:settings'
, namespace
=
'CELERY'
)
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
|
配置就是这么简单,为了能在django里使用这个app,我们需要在__init__.py中导入它:
1
|
from
.celery
import
app as celery_app
|
然后我们来看tasks.py,它应该位于你的app目录中,前面我们配置了自动发现,所以celery会自动找到这些tasks,我们的tasks将写在这一模块中,代码涉及了一些orm的使用,为了契合主题我做了些精简:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
from
linux_news.celery
import
celery_app as app
from
.models
import
*
import
time
import
feedparser
import
pytz
import
html
@app
.task(ignore_result
=
True
)
def
fetch_news(origin_name):
"""
fetch all news from origin_name
"""
origin
=
get_feeds_origin(origin_name)
feeds
=
feedparser.parse(origin.feed_link)
for
item
in
feeds[
'entries'
]:
entry
=
NewsEntry()
entry.title
=
item.title
entry.origin
=
origin
entry.author
=
item.author
entry.link
=
item.link
# add timezone
entry.publish_time
=
item.time.replace(tzinfo
=
pytz.utc)
entry.summary
=
html.escape(item.summary)
entry.save()
@app
.task(ignore_result
=
True
)
def
fetch_all_news():
"""
这是我们的定时任务
fetch all origins' news to db
"""
origins
=
NewsOrigin.objects.
all
()
for
origin
in
origins:
fetch_news.delay(origin.origin_name)
|
tasks里是一些耗时操作,比如网络IO或者数据库读写,因为我们不关心任务的返回值,所以使用 @app.task(ignore_result=True) 将其屏蔽了.
任务配置完成后我们就要配置celery了,我们选择redis作为任务队列,我强烈建议在生产环境中使用rabbitmq或者redis作为任务队列或结果缓存后端,而不应该使用关系型数据库:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# redis
REDIS_PORT
=
6379
REDIS_DB
=
0
# 从环境变量中取得redis服务器地址
REDIS_HOST
=
os.environ.get(
'REDIS_ADDR'
,
'redis'
)
# celery settings
# 这两项必须设置,否则不能正常启动celery beat
CELERY_ENABLE_UTC
=
True
CELERY_TIMEZONE
=
TIME_ZONE
# 任务队列配置
CELERY_BROKER_URL
=
f
'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_ACCEPT_CONTENT
=
[
'application/json'
, ]
CELERY_RESULT_BACKEND
=
f
'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_TASK_SERIALIZER
=
'json'
|
然后是我们的定时任务设置:
1
2
3
4
5
6
7
|
from
celery.schedules
import
crontab
CELERY_BEAT_SCHEDULE
=
{
'fetch_news_every-1-hour'
: {
'task'
:
'news.tasks.fetch_all_news'
,
'schedule'
: crontab(minute
=
0
, hour
=
'*/1'
),
}
}
|
定时任务配置对象是一个dict,由任务名和配置项组成,主要配置想如下:
至此,配置celery beat的部分就结束了.
启动celery beat 。
配置完成后只需要启动celery了.
启动之前配置一下环境。不要用root运行celery!不要用root运行celery!不要用root运行celery!重要的事情说三遍.
start-celery.sh:
1
2
3
|
export
REDIS_ADDR=127.0.0.1
celery -A linux_news worker -l info -B -f
/path/to/log
|
-A 表示app所在的目录,-B表示启动celery beat运行定时任务.
celery正常启动后就可以通过日志来查看任务是否正常运行了:
[2018-12-21 13:00:00,022: INFO/MainProcess] Received task: news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] [2018-12-21 13:00:00,046: INFO/MainProcess] Received task: news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] [2018-12-21 13:00:00,051: INFO/ForkPoolWorker-2] Task news.tasks.fetch_all_news[e4566ede-2cfa-4c19-b2f3-0c7d6c38690d] succeeded in 0.02503809699555859s: None [2018-12-21 13:00:00,052: INFO/MainProcess] Received task: news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] [2018-12-21 13:00:00,449: INFO/ForkPoolWorker-5] Task news.tasks.fetch_news[c61a3e55-dd3c-4d49-8d6d-ca9b1757db25] succeeded in 0.39487219898728654s: None [2018-12-21 13:00:00,606: INFO/ForkPoolWorker-3] Task news.tasks.fetch_news[583e96dc-f508-49fa-a24a-331e0c07a86b] succeeded in 0.5523456179944333s: None 。
以上就是celery4运行定时任务的内容,希望对大家的学习有所帮助,也希望大家多多支持我.
原文链接:http://www.cnblogs.com/apocelipes/p/10156224.html 。
最后此篇关于celery4+django2定时任务的实现代码的文章就讲到这里了,如果你想了解更多关于celery4+django2定时任务的实现代码的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我对 Python-Django 和 web 开发还很陌生,我被困在这个使用 POST 创建新资源的特殊问题上。 我正在为 REST API 使用 Django REST 框架,我正在尝试创建一个新资
我已经使用 Django-storages 成功地将 Word 文档存储到 S3。 class Document(TitleSlugDescriptionModel, TimeStampedModel
我有 2 个关于模型代理的问题, 如何从模型对象创建代理对象? 如何从模型查询集创建代理查询集? 例如,假设我们定义了: from django.contrib.auth.models import
我想编写一个直接执行 HTTP 请求的单元测试(而不是使用 django.test.client.Client)。 如果您好奇为什么 - 那是因为我想测试我从 Django 应用程序公开的 Thrif
我为我的个人网站启动了一个 django 项目来学习 django。到目前为止,我已经将我的开发环境设置为我需要的一切,并遵循 this很棒的教程来创建一些基本的数据结构和模板。现在我想开始使用我之前
我已经阅读了很多关于如何在使用 Django 注册时添加额外字段的信息,例如 here 、 here 和 here 。代码片段是: forms.py(来自注册应用程序) class Registrat
我正在编写小型社交应用程序。功能之一是在网站标题中写入用户名。因此,例如,如果我登录并且我的名字是Oleg(用户名),那么我应该看到: Hello, Oleg | Click to edit prof
我有一个使用 Django 和 Django Rest 框架开发的应用程序。我想将 django-reversion 功能添加到我的应用程序中。 我已经尝试过http://django-reversi
我有一个简单的 HTML 表单,我没有使用 Django 表单,但现在我想添加一个选择。 选择最容易创建为 Django ChoiceField (与通过循环等手动创建选择相反),但是,如果没有在 D
我不明白为什么人们以两种方式编写外键,这样做的目的是什么?它们是相同还是不同? 我注意到有些人这样写: author = models.ForeignKey(Author, on_delete=mod
我想在我的 Django 应用程序中获取评论最多的十个帖子,但我做不到,因为我想不出合适的方法。 我目前正在使用 django 评论框架,并且我已经看到使用 aggregate or annotate
这对于 Django 1.2 仍然有效吗? Custom Filter in Django Admin on Django 1.3 or below 我已经尝试过,但管理类中的 list_filter
问题在于,当 django-compressor 编译为 .js 文件的 CoffeeScript 文件中引用 {{ STATIC_URL }} 时,它无法正确加载。 在我的 django 模板中,我
我正在尝试将一些字段从一个 django 模型移动到一个新模型。假设我有一个书籍模型: class Book(models.Model): title = models.CharField(max
我想在我的 Django 应用程序中获取评论最多的十个帖子,但我做不到,因为我想不出合适的方法。 我目前正在使用 django 评论框架,并且我已经看到使用 aggregate or annotate
目前我正在寻找在 Django 中实现访问控制。我已经阅读了有关内置权限的内容,但它并不关心每个对象的基础。例如,我想要“只有创建者可以删除自己的项目”之类的权限。所以我读到了 django-guar
嗨,我正在将我的 Django 模型的一个字段的值设置为其他模型的另一个字段的值。这个值应该是动态变化的。 这是我的第一个模型 class MainModel(AbstractBaseUser, Pe
我正在尝试为我的模型创建一个编辑表单。我没有使用模型表单,因为根据模型类型,用户可以使用不同的表单。 (例如,其中一个表单有 Tinymce 小部件,而另一个没有。) 有没有什么方法可以使用模型设置表
Django 模板中的搜索字段 如何在类似于此图像的 Django 模板中创建搜索字段 http://asciicasts.com/system/photos/1204/original/E354I0
根据 Django documentation ,如果 Django 安装激活了 AuthenticationMiddleware,HttpRequest 对象有一个“user”属性代表当前登录的用户
我是一名优秀的程序员,十分优秀!