gpt4 book ai didi

python - 无法导入 __init__.py 中定义的变量

转载 作者:太空宇宙 更新时间:2023-11-04 02:27:54 47 4
gpt4 key购买 nike

我正在尝试按照 http://flask.pocoo.org/docs/0.12/patterns/celery/ 中的说明进行操作所以我可以在 celery 任务中执行 flask/socketIO 操作。然而,我的目录结构有点不同,而且我在导入方面没有任何运气。

我的目录结构如下:

├── app
│   ├── __init__.py
│   ├── __pycache__
│   ├── auth.py
│   ├── ctasks.py
│   ├── helper.py
│   ├── saml.py
│   ├── socks.py
│   ├── templates
│   ├── threads.py
│   └── views.py
├── app.py
├── config.py
├── requirements.txt
└── saml
   ├── dev
   └── prod

我从 app.py 调用应用程序

from app import socketio, app

if __name__ == '__main__':
socketio.run(app, debug=True, port=443, ssl_context='adhoc')

__init__.py

from flask import Flask, request
from flask_socketio import SocketIO
from .ctasks import subtaskcaller, make_celery
from .helper import wait_to_finish

async_mode = None

app = Flask(__name__)
app.config.from_object('config')
socketio = SocketIO(app, async_mode=async_mode)
cel = make_celery(app)

from .auth import SamlManager
saml_manager = SamlManager()
saml_manager.init_app(app)
from app import views, socks, saml, helper, ctasks

ctasks.py

from celery import Celery
from config import *
from .helper import wait_to_finish, emitter
import time
from app import cel


def make_celery(app):
c = Celery(app.import_name, backend=CELERY_RESULT_BACKEND, broker=CELERY_BROKER_URL)
c.conf.update(app.config)
taskbase = c.Task

class ContextTask(taskbase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return taskbase.__call__(self, *args, **kwargs)

c.Task = ContextTask
return c

@cel.task(name='tasks.tester', serializer='pickle')
def tester():
emitter('emit from subsubtask')
for i in range(1, 50):
time.sleep(1)
print('test {0}'.format(i))
x = True
return x

@cel.task(name='task.subtaskcaller', serializer='pickle')
def subtaskcaller():
emitter('emit from subtask')
finished = tester.delay()
wait_to_finish(finished)
return finished

尝试从 ctasks.py 中的应用程序导入 cel 时出现错误:

ImportError: 无法导入名称“cel”

最佳答案

在您的__init__.py 中,您只有cel。您的 __init__ 文件中没有名为 celery 的对象,因此无法将其导入其他文件。您可以尝试 from app import cel

编辑:

__init__from .ctasks import subtaskcaller, make_celery

但在 ctasks 中,您从应用程序导入 cel(当时还不存在,当时只有 Flask、request 和 SocketIO 存在) .

因此您需要将您的@cel 修饰函数放在另一个脚本中,您可以将其一直导入到__init__

的底部

关于python - 无法导入 __init__.py 中定义的变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49909363/

47 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com