gpt4 book ai didi

python - 在 Flask 蓝图中导入 Celery

转载 作者:行者123 更新时间:2023-12-04 00:59:43 26 4
gpt4 key购买 nike

我有一个具有 MVC 结构的 Flask 应用程序:

my_app
├── server.py
├── requirements.txt
├── models
│ ├── __init__.py
└── model.py
├── controllers
├── __init__.py
├── client_controllers
└──controller.py
└── another_controller.py
└── templates

我使用蓝图在“ Controller ”中拆分服务器代码,所以我有这样的事情:

服务器.py:
from flask import Flask
from celery import Celery

from controllers.client_controllers.controller import controller

app = Flask(__name__)
app.secret_key = 'SECRET'

app.register_blueprint(controller)

# Celery Configuration
def make_celery(app):

celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery

app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


if __name__ == "__main__":
app.run(host='0.0.0.0', debug=True)

Controller .py:
from flask import Blueprint, render_template, json, request, redirect, url_for, abort, session



controller = Blueprint('controller', __name__,
template_folder='templates/')

@celery.task()
def add_together(a, b):
return a + b

@controller.route('/add', methods=['GET'])
def add():
result = add_together.delay(23, 42)
result.wait()
return 'Processing'

您可能会注意到,celery 没有导入到 Controller 中,因为我不知道如何从 导入 celery 实例。 server.py 进入我的 Controller .py 没有收到错误,我一直在尝试:
from ...server import celery
from ..server import celery
...etc

但仍然因错误而失败。

最佳答案

flask 错误 RuntimeError: Working outside of application context.发生是因为您不在 Flask 中 application_context() .

您应该使用 celery shared_task鉴于您的 MVC 结构,这就是您所需要的。

celery_flask/
├── celery_tasks
│   ├── app_tasks.py
│   ├── __init__.py
├── celery_worker.py
├── controllers
│   ├── __init__.py
│   ├── some_controller.py
├── __init__.py
└── server.py


脚本 app_tasks.py
#=====================
# app_tasks.py
#=====================
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task(name='celery_tasks.add_together')
def add_together(x, y):
return x + y

@shared_task装饰器返回一个始终指向事件 Celery 实例的代理:
>>> from celery import Celery, shared_task
>>> @shared_task
... def add_together(x, y):
... return x + y
...
>>> app1 = Celery(broker='amqp://')
>>> add_together.app is app1
True
>>> app2 = Celery(broker='redis://')
>>> add_together.app is app2
True

定义任务后,您可以使用对 Celery 应用程序的引用来调用它们。这个 celery 应用程序可能是 flask 的一部分 application_context() .例子:

脚本 server.py
from __future__ import absolute_import
from flask import Flask
from celery import Celery

from controllers.some_controller import controller

flask_app = Flask(__name__)
flask_app.secret_key = 'SECRET'

flask_app.register_blueprint(controller)

# Celery Configuration
def make_celery( app ):
celery = Celery('flask-celery-app', backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'],
include=['celery_tasks.app_tasks'])
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery

def list_celery_task( ):
from celery.task.control import inspect
i = inspect()
i.registered_tasks()
from itertools import chain
t = set(chain.from_iterable( i.registered_tasks().values() ))
print "registered_tasks={}".format( t )
#======================================
# MAIN
#======================================
flask_app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
flask_app.celery = celery
list_celery_task( )


if __name__ == "__main__":
flask_app.run(host='0.0.0.0', debug=True)

脚本 some_controller.py
#============================
# some_controller.py
#============================
from __future__ import absolute_import
from flask import Blueprint
from flask import current_app

controller = Blueprint('controller', __name__,
template_folder='templates/')

@controller.route('/add', methods=['GET'])
def add():
print "calling add"
result = current_app.celery.send_task('celery_tasks.add_together',args=[12,6])
r = result.get()
print 'Processing is {}'.format( r )
return 'Processing is {}'.format( r )

最后,启动worker来消费任务:
celery -A celery_worker worker --loglevel=DEBUG

脚本 celery_worker.py
#============================
# celery_worker.py
#============================
from __future__ import absolute_import
from celery import Celery

# Celery Configuration
def make_celery():
celery = Celery('flask-celery-app', backend='redis://localhost:6379',
broker='redis://localhost:6379',
include=['celery_tasks.app_tasks'])
return celery

celery = make_celery()
print "tasks={}".format( celery.tasks.keys() )

关于python - 在 Flask 蓝图中导入 Celery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59632556/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com