gpt4 book ai didi

Python Celery - 如何在其他任务中调用 celery 任务

转载 作者:太空狗 更新时间:2023-10-29 22:22:38 25 4
gpt4 key购买 nike

我在 Django-Celery 的任务中调用任务

这是我的任务。

@shared_task
def post_notification(data,url):
url = "http://posttestserver.com/data/?dir=praful" # when in production, remove this line.
headers = {'content-type': 'application/json'}
requests.post(url, data=json.dumps(data), headers=headers)


@shared_task
def shipment_server(data,notification_type):
notification_obj = Notification.objects.get(name = notification_type)
server_list = ServerNotificationMapping.objects.filter(notification_name=notification_obj)

for server in server_list:
task = post_notification.delay(data,server.server_id.url)
print task.status # it prints 'Nonetype' has no attribute id

如何在任务中调用任务?我在某处读到它可以使用 group 完成,但我无法形成正确的语法。我该怎么做?

我试过了

for server in server_list:
task = group(post_notification.s(data, server.server_id.url))().get()
print task.status

抛出一个警告说

TxIsolationWarning: Polling results w│                                                                        
ith transaction isolation level repeatable-read within the same transacti│
on may give outdated results. Be sure to commit the transaction for each │
poll iteration. │
'Polling results with transaction isolation level '

不知道这是什么!!!

我该如何解决我的问题?

最佳答案

这应该有效:

celery.current_app.send_task('mymodel.tasks.mytask', args=[arg1, arg2, arg3])

关于Python Celery - 如何在其他任务中调用 celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21829868/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com