gpt4 book ai didi

python - 将任务的结果输入到 Celery 中的 map 中

转载 作者:行者123 更新时间:2023-12-04 17:44:54 25 4
gpt4 key购买 nike

我觉得这是一个非常简单的用例,但不是我从文档中成功解决的用例。

这是我的问题的人为版本:

我有一个任务接受一个参数并返回一个列表。我想对列表中的每个元素应用另一个任务。然后我(视情况而定)可能想继续处理其他任务。

chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
).apply_async().get()

chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
my.tasks.unite.s(),
my.tasks.rule.s(),
).apply_async().get()

map 不能那样工作,但我觉得应该这样!我想要的只是从一个参数链接到一个列表,然后再链接回来。当然这不难正确地完成吗?

“解决方案”herehere看起来过于复杂,我不敢相信这是最佳实践。

The documentation在和弦、映射和链上对这种特殊情况没有帮助。

编辑:到目前为止我所拥有的。这里的问题是我必须调用 .apply_async().get() 两次才能获得实际结果,因为中间步骤返回一个和弦而不是实际结果:

@app.task()
def divide(item):
return [x for x in item]

@app.task()
def conquer(item):
return item.upper()

@app.task()
def rule(items):
return "".join(items)

@app.task()
def conquer_group(items):
return group([conquer.s(x) for x in items])

@app.task()
def rule_group(items):
return chord([conquer.s(x) for x in items], rule.s())

def main():
print chain(
divide.s("foobarbaz"),
rule_group.s(),
).apply_async().get().apply_async().get()

最佳答案

这里的问题是 map() 需要一个 iterable 来实例化。这就是为什么可以执行 (sample.map(range(100)) | another_task.s()).apply_async() 但不实例化接受的 map() 签名的原因链中最后一个任务的结果。我认为最简单的方法是在任务中实例化 map() 签名。

@app.task()
def divide(item):
return [x for x in item]

@app.task()
def conquer(item):
return item.upper()

@app.task()
def process(items):
return conquer.map(items)

def run():
(divide.s("foobar") | process.s()).apply_async()

关于python - 将任务的结果输入到 Celery 中的 map 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52552695/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com