gpt4 book ai didi

django - Celery 的 pytest 装置(celery_worker 和 celery_app)不起作用

转载 作者:行者123 更新时间:2023-12-03 15:55:31 25 4
gpt4 key购买 nike

我正在尝试为我的 Django(v. 2.2.3) 应用程序编写 Celery(v. 4.2.1) 集成测试。

周围有一堆过时的文章,但它们似乎都没有使用最新的 celery 测试文档中的内容 - https://docs.celeryproject.org/en/v4.2.1/userguide/testing.html#fixtures

似乎 Celery 带有两个用于测试的装置:celery_appcelery_worker这应该允许在测试的后台线程中实际运行 worker。

正如医生所说,我已经添加了

@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'memory://',
'result_backend': 'rpc'
}

进入我的 conftest.py
我已经包装了我的测试功能
@pytest.mark.celery_app
@pytest.mark.celery_worker

通常我把 celery 任务用
from celery import shared_task
@shared_task
def blabla(...):
pass

但我什至试图用
from myapp.celery import celery
@celery.task
def blabla():
pass

还有什么...我通过 apply_async 运行我的 celery 任务提供 eta争论。

尝试了很多方法,但 celery 固定装置不会影响事情的工作方式,任务调用转到实际的 redis 实例,并由工作人员在单独的进程中选择(如果我运行它),因此我的 assert_called随着我访问测试数据库中的对象的努力失败。

这样它就不会加载夹具。
enter image description here

这样它就不会使用指定的固定装置,因为它们应该出现在方法参数中并通过超过参数的数量来破坏它。

enter image description here

认为 Celery pytest 插件可能根本不存在,但事实并非如此 - 试图明确注册它:
enter image description here

虽然夹具可用于pytest:
enter image description here

但我已经进入了他们的源代码,添加了一些疯狂的 prints在那里,我没有看到它们被记录。

最佳答案

OP在这里,我想通了并写了一篇文章-https://medium.com/@scythargon/how-to-use-celery-pytest-fixtures-for-celery-intergration-testing-6d61c91775d9

主键:

@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
class MyTest():
def test(self):
assert mul.delay(4, 4).get(timeout=10) == 16

关于django - Celery 的 pytest 装置(celery_worker 和 celery_app)不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59075969/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com