gpt4 book ai didi

python - 模拟 Celery 任务方法 apply_async

转载 作者:行者123 更新时间:2023-11-30 23:00:26 24 4
gpt4 key购买 nike

我创建了一个依赖于其他模块的 celery 任务。我在模型保存方法中延迟了 celery 任务,我想在没有 celery 的情况下通过所有测试用例。下面是我的代码。

def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):

super(Offers, self).save(force_insert=False, force_update=False, using=None,
update_fields=None)

change_offer_status.apply_async(args=[self.id], eta=self.valid_to, queue='credr_core_task',
routing_key='credr_core_task')

测试.py

class OfferTests(APITestCase):
authenticated_client = APIClient()

def setUp(self):
data = {
"username": "vivek",
"first_name": "Vivek",
"last_name": "Dogra",
"email": "vivk@credr.com",
"contact_number": "9834982602",
"password": "easy"
}
self.user = User.objects.create(data)
token = Token.objects.get(user__username='vivek')
self.authenticated_client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
mock_task = Mock()
mock_task.get = Mock(return_value={'success': True})

print mock_task.get() # outputs {'success': True}

with patch('offers.tasks.change_offer_status.apply_async', new=mock_task) as mocked_task:
mocked_task.return_value = True

@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True, )
def test_add_offer(self):
"""
add user address
"""
start_date = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
end_date = datetime.strftime(datetime.now() + timedelta(days=3), "%Y-%m-%d %H:%M:%S")
data = {
"type": OfferTypeEnum.FLAT._value_,
"name": "CredR Offer",
"code": "FLAT100",
"status": OfferStatusEnum.ACTIVE._value_,
"value": "1004",
"discount_value": 1004,
"valid_from": start_date,
"valid_to": end_date,
"message": "GET FLAT 100"
}
response = self.authenticated_client.post(URL, data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)

我在setUp方法中创建了一个模拟对象。但我收到rabbitMq的连接拒绝错误。

最佳答案

i want to pass all test cases without celery

运行测试时设置CELERY_ALWAYS_EAGER = True

[...] tasks will be executed locally instead of being sent to the queue.

在这里阅读其他配置选项也可能是值得的:http://docs.celeryproject.org/en/latest/configuration.html#celery-always-eager

关于python - 模拟 Celery 任务方法 apply_async,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35284043/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com