gpt4 book ai didi

python - flask celery 任务不起作用

转载 作者:太空狗 更新时间:2023-10-30 01:28:21 30 4
gpt4 key购买 nike

我配置了我的项目,引用这个答案: How to use Flask-SQLAlchemy in a Celery task

我的 extension.py 文件:

import flask
from flask.ext.sqlalchemy import SQLAlchemy
from config import BaseConfig
from celery import Celery
from flask_mail import Mail
from celery.schedules import crontab


class FlaskCelery(Celery):

def __init__(self, *args, **kwargs):

super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()

if 'app' in kwargs:
self.init_app(kwargs['app'])

def patch_task(self):
TaskBase = self.Task
_celery = self

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

self.Task = ContextTask

def init_app(self, app):
self.app = app
self.config_from_object(app.config)


mail = Mail()
db = SQLAlchemy()
settings = BaseConfig()
celery = FlaskCelery()

然后在我的 app_settings.py 中,我创建了这个应用程序:

app = Flask('app', instance_relative_config=True)

和配置的 celery :

celery.init_app(app)

我用 python manage.py run 运行了 flask 项目:

app.run(
debug=settings.get('DEBUG', False),
host=settings.get('HOST', '127.0.0.1'),
port=settings.get('PORT', 5000)
)

然后运行 celery :

celery -A manage.celery worker --beat -l debug

celery 原木看起来不错:

[tasks]
. app.api.tasks.spin_file
. app.main.tasks.send_async_email
. celery.backend_cleanup
. celery.chain
...

然后在 views.py 中,我调用这个任务:

send_async_email.delay(*args, **kwargs)

但所有任务都被 Celery 忽略了。没有任何反应,没有错误,没有警告。没有。我做错了什么?

编辑:当我使用此命令启动 celery 时:celery -A manage.celery worker --beat -l debug

我收到以下警告:

[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'.
Please make sure you give each node a unique nodename using the `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),

最佳答案

我不确定这是否对您有帮助,但每当我需要 celery 时,我都会在我的许多项目中使用此代码:

from flask import Flask, request, jsonify as jsn
from celery import Celery
app = Flask(__name__)
app.config.update(dict(
SECRET_KEY='blabla'
)
)
# Celery configuration
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'database'
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db'
app.config['CELERY_TRACK_STARTED'] = True
app.config['CELERY_SEND_EVENTS'] = True

# Initialize Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def do_something(data):

from celery import current_task
import os
import subprocess
with app.app_context():
#run some bash script with some params in my case

然后我通过以下方式与主管一起运行 celery :

#!/bin/bash
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file

当然在我的 route 我有类似的东西

@app.route('/someroute', methods=["POST"])
def someroute():
result = do_something.delay(data)
print result.id

关于python - flask celery 任务不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32667208/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com