gpt4 book ai didi

django - Python Celery group() - TypeError : [. ..] ** 之后的参数必须是一个映射,不长

转载 作者:行者123 更新时间:2023-12-04 02:01:20 26 4
gpt4 key购买 nike

我正在尝试运行一个 celery (3.1.17) 任务,该任务在一个组中执行更多任务,但我总是遇到错误。这就是我设置代码的方式:

from celery import task, group

@task
def daily_emails():

[...]

all_tasks = []

for chunk in range(0, users.count(), 1000):
some_users = users[chunk:chunk+1000]
all_tasks.append(write_email_bunch.subtask(some_users, execnum))

job = group(all_tasks)
# result = job.apply_async()
# job.get()
result = job.delay()
print result
results = result.join()
print results

print "done writing email tasks"
count = sum(results)
print count


@task
def write_email_bunch(some_users, execnum):

[...]

return len(some_users) - skipped_email_count

这是输出:

<GroupResult: 3d766c85-21af-4ed0-90cb-a1ca2d281db1 [69527252-8468-4358-9328-144f727f372b, 6d03d86e-1b69-4f43-832e-bd27c4dfc092, 1d868d1b-b502-4672-9895-430089e9532e]>
Traceback (most recent call last):
File "send_daily_emails.py", line 8, in <module>
daily_emails()
File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/var/www/nt_dev/nt/apps/emails/tasks.py", line 124, in daily_emails
results = result.join()
File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 642, in join
interval=interval, no_ack=no_ack,
File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 870, in get
raise self.result
TypeError: write_email_bunch() argument after ** must be a mapping, not long

所以我得到了一个 GroupResult 但不知何故我无法加入它或进一步处理它。当我使用 write_email_bunch.s(some_users, execnum) 我得到这个异常:

  File "/var/www/virtualenvs/nt_dev/local/lib/python2.7/site-packages/celery/result.py", line 870, in get
raise self.result
TypeError: 'tuple' object is not callable

我将如何等待所有组任务完成后再继续?job.get() 给我这个异常:

TypeError: get expected at least 1 arguments, got 0

最佳答案

subtask 接受一个 args 元组,一个 kwargs 字典和任务选项,所以它应该这样调用:

    all_tasks.append(write_email_bunch.subtask((some_users, execnum)))

注意我们传递给它一个包含参数的元组

此外,您不应该在任务中等待任务 - 这可能会导致死锁。在这种情况下,我认为 daily_emails 不需要是一个 celery 任务——它可以是一个创建 Canvas 对象并运行异步应用的常规函数​​。

def daily_emails():

all_tasks = []

for chunk in range(0, users.count(), 1000):
some_users = users[chunk:chunk+1000]
all_tasks.append(write_email_bunch.subtask(some_users, execnum))

job = group(all_tasks)
result = job.apply_async()
return result.id

关于django - Python Celery group() - TypeError : [. ..] ** 之后的参数必须是一个映射,不长,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30996696/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com