gpt4 book ai didi

django - 如何模拟 celery 任务的绑定(bind)上下文

转载 作者:行者123 更新时间:2023-12-03 03:23:56 24 4
gpt4 key购买 nike

如何模拟绑定(bind)上下文,或模拟 celery 任务 ID?

给定一个 celery 任务,例如:

helpers.py:

from task import some_task

def some_helper():
some_task.delay(123)

在任务.py中:

@app.task(queue="abc", bind=True)
def some_task(self, some_number: int):
print(self.id) # how to mock this attribute access?

简单的测试用例:

from django.test.testcases import TestCase
from helpers import some_helper


class SomeTest(TestCase):

def test_some_helper(self):
some_helper()

我尝试过:

 @patch("celery.app.base.Celery.task", return_value=lambda x: x)

我也尝试过:

class MockResult(dict):
def __getattr__(self, x):
return self[x]

...
def test_some_task(self):
cls = MockResult({"id": "asdf"})
bound_some_task = some_task.__get__(cls, MockResult)
bound_some_task(123)

相关:

最佳答案

给定一个 celery 任务,如下所示:

@my_celery_app.task(bind=True)
def my_task(self):
if self.request.retries == 1:
my_method_to_invoke()
# Do work for first retry
elif self.request.retries == 2:
# Do work for second retry
# do work for main task

测试可以通过模拟 celery 中的基本 Task.request 类属性来设置 self.request.retries

在单元测试中可以完成以下操作

@patch("path.to.task.my_method_to_invoke")
@patch("celery.app.task.Task.request")
def my_test_method(self, mock_task_request, mock_my_method_to_invoke):

# Set the value of retries directly
mock_task_request.retries = 1

# Call the task and assert the inside method was
# called
my_task()

mock_my_method_to_invoke.assert_called_once()

可以对任务上的 id 执行相同的操作。我找到了这个答案,寻找如何在绑定(bind)的 celery 任务上模拟 self

关于django - 如何模拟 celery 任务的绑定(bind)上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55820146/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com