- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
Celery异步任务框架如何使用?看这里
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度
# 官网解释
"""
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
"""
"""
1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(task result store)组成。
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
安装:pip install celery
消息中间件:RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)
注意如果是windows平台还需要安装:pip install eventlet
# 第一步:定义一个py文件(名字随意,celery_task)
"""celery_task.py"""
from celery import Celery
backend = 'redis://127.0.0.1:6379/1' # 结果存储
broker = 'redis://127.0.0.1:6379/2' # 消息中间件
app = Celery(__name__,broker=broker,backend=backend) # __name__区分__main__
# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
return a+b
# 第二步:提交任务(新建一个py文件:submit_task)
"""submit_task.py"""
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res) # 2ddb35df-25f2-4f7c-8405-0bd7b1fa5645
# 第三步:任务执行单元执行,使用命令启动worker
格式:celery -A 文件名 worker -l 日志输出级别 (win平台+-P eventlet)
celery -A celery_task worker -l info -P eventlet
'''
celery_task:py文件的名字
-l info:日志输出级别是info
-P eventlet 在win平台需要下载,pip install eventlet
'''
#如果队列里有任务,就会执行,如果没有任务,worker就等在这
# 第四步:查询结果是否执行完成 get_result.py
"""get_result.py"""
from celery_task import app
from celery.result import AsyncResult
id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
asy = AsyncResult(id=id, app=app)
if asy.successful():
result = asy.get()
print(result)
elif asy.failed():
print('任务失败')
elif asy.status == 'PENDING':
print('任务等待中被执行')
elif asy.status == 'RETRY':
print('任务异常后正在重试')
elif asy.status == 'STARTED':
print('任务已经开始被执行')
随便定义包名,但是包内必须要有celery.py
步骤:
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
from celery_task.add_task import add
from celery_task.celery import app
res=add.delay(100,200)
print(res)
# 提交任务delay在任意位置提交就可以,只需将celery任务导过来即可
scripts> celery -A celery_task worker -l info -P eventlet
from celery_task import app
from celery.result import AsyncResult
id = '2ddb35df-25f2-4f7c-8405-0bd7b1fa5645'
if __name__ == '__main__':
asy = AsyncResult(id=id, app=app)
if asy.successful():
result = asy.get()
print(result)
elif asy.failed():
print('任务失败')
elif asy.status == 'PENDING':
print('任务等待中被执行')
elif asy.status == 'RETRY':
print('任务异常后正在重试')
elif asy.status == 'STARTED':
print('任务已经开始被执行')
异步任务,延迟任务,定时任务
上面的示例就是
# 其他不变,提交任务的时候,如下:
from celery_task.user_task import add
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10)
# 参数传递需要使用args,传时间要使用时间对象eta,使用的是utc时间
mul.apply_async(args=(20, 50), eta=eta)
定时任务需要启动beat
和worker
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.add_task'])
# include内写app管理的任务
# 时区
app.conf.timezone='Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc=False
#第一步:在celery.py中配置
# celery任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'''
任务名:{
task:任务
schedule:时间
args:参数(函数参数)
}
'''
'task-mul': {
'task': 'celery_task.user_task.mul',
'schedule': timedelta(seconds=3), # 3s后
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 15),
},
'task-add': {
'task': 'celery_task.home_task.add',
'schedule': timedelta(seconds=10), # 10s后
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (3, 5),
},
}
#第二步:启动beat(beat负责定时提交任务)
celery -A celery_task beat -l info
# 第三步:启动worker,任务就会被worker执行了
celery -A celery_task worker -l info -P eventlet
当我运行此命令进行 celery 节拍时。 [2013-06-27 02:17:05,936: INFO/MainProcess] Celerybeat: Starting... [2013-06-2
我需要构建一个处理两种类型任务的系统。一种类型可以创建更多自身或另一种类型的任务。将有很少的 worker (2-3)和只有一个主机。最重要的要求是系统应该优雅地处理重新启动:即在重新启动时,正在进行
我们使用 Celery 4.2.1 和 Redis,并为我们的任务设置了全局软超时和硬超时。我们所有的自定义任务都设计为保持在限制范围内,但每天内置任务 backend_cleanup 任务最终都会因
我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回? 所以我可以调用 actual_result = MyTask.dont_delay(some_arg="foo")
我们使用 Celery 4.2.1 和 Redis,并为我们的任务设置了全局软超时和硬超时。我们所有的自定义任务都设计为保持在限制范围内,但每天内置任务 backend_cleanup 任务最终都会因
我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回? 所以我可以调用 actual_result = MyTask.dont_delay(some_arg="foo")
我计划使用 celery 作为我的项目的任务管理组件。它几乎具有我的项目所需的所有功能。我将有一组可以独立执行或按指定顺序执行的任务。在顺序任务中,我希望能够在中间任务之一失败时执行清理/回滚。我想知
它是运行 Celery 的实际处理器还是另一个进程?在花中,我可以在工作池中看到多个进程吗?这两者之间有什么区别? 最佳答案 当您运行 celery worker 时,它会创建一个父进程来管理正在运行
我有一个名为 ShippingApp 的项目,我按照步骤设置了 celery worker。我将 celery 3.1.26.post2 与 python3.7 一起使用,当我想启动 Celery W
尽我所能,我无法杀死这些 celery worker 。 我跑: celery --app=my_app._celery:app status 我看到我有3个(我不明白为什么3个 worker = 2
我在 docker 容器中运行了 celery ,我想检查选项 CELERY_TASK_RESULT_EXPIRES = '3600' 是否已应用。 我尝试使用 celery inspect conf
我使用 celery.chord(...) 创建一组任务和一个方法,该方法在组中的所有任务完成后被调用。 我使用 amqp 结果后端(但我想切换到 memcached)。 我的 worker 每秒钟一
我正在寻找一些关于将任务生成的列表映射到 celery 中的另一个任务的最佳方法的建议。 假设我有一个名为 parse 的任务,它解析 PDF 文档并输出页面列表。然后,每个页面都需要单独传递给另一个
这不是关于如何捕获 celery worker 日志的问题。有什么方法可以捕获生产者上的 celery 日志记录。我想要的是捕获当我调用 task.delay(...) 或 task.apply_as
我正在使用以下版本: 花==0.9.3 celery ==4.3.0 这为我提供了包含多个列的任务页面的以下显示: 每次我进入这个页面时,我最终都会重新排列页面,使列的顺序不同,并将行的顺序更改为降序
我想完成这样的事情: results = [] for i in range(N): data = generate_data_slowly() res = tasks.process
我想运行一个由beat 调度的复杂任务。让我们假设定义了默认的 add/mul 任务。 @app.on_after_configure.connect def setup_periodic_tasks
我有一个应用程序,其中包含 celery worker 。当我部署这将杀死那些正在运行的进程。 所以任务将开始,但永远不会完成,并且在部署完成时不会重新启动。 避免此问题并在部署完成后重新启动这些任务
我正在开始使用 Celery 进行 Django 项目。出于本地开发目的,我根据这些说明使用 djcelery 和 djkombu(数据库传输)进行了设置 http://ask.github.com/
如何配置 celery 在任务失败时发送电子邮件警报? 例如,我希望 Celery 在 3 个以上的任务失败或 10 个以上的任务被重试时通知我。 是否可以使用 celery 或实用程序(例如花),或
我是一名优秀的程序员,十分优秀!