gpt4 book ai didi

python - Luigi - 重写任务需要/输入

转载 作者:太空狗 更新时间:2023-10-30 01:31:44 24 4
gpt4 key购买 nike

我正在使用 luigi 来执行一系列任务,如下所示:

class Task1(luigi.Task):
stuff = luigi.Parameter()

def output(self):
return luigi.LocalTarget('test.json')

def run(self):
with self.output().open('w') as f:
f.write(stuff)


class Task2(luigi.Task):
stuff = luigi.Parameter()

def requires(self):
return Task1(stuff=self.stuff)

def output(self):
return luigi.LocalTarget('something-else.json')

def run(self):
with self.output().open('w') as f:
f.write(stuff)

当我像这样开始整个工作流程时,这完全符合预期:

luigi.build([Task2(stuff='stuff')])

使用 luigi.build 时,您还可以通过显式传递参数来运行多个任务,as per this example in the documentation .

但是,在我的情况下,我还希望能够完全独立于它在工作流中的参与来运行 Task2 的业务逻辑。这适用于未实现 requires 的任务,as per this example .

我的问题是,如何将此方法作为工作流程的一部分以及单独运行?显然,我可以只添加一个新的私有(private)方法,如 _my_custom_run,它获取数据并返回结果,然后在 run 中使用这个方法,但它只是感觉有点像这应该融入框架,所以这让我觉得我误解了 Luigi 的最佳实践(仍在学习框架)。感谢任何建议,谢谢!

最佳答案

听起来你想要dynamic requirements.使用该示例中显示的模式,您可以读取配置或传递具有任意数据的参数,并且 yield 仅根据配置中的字段要求执行任务。

# tasks.py
import luigi
import json
import time


class Parameterizer(luigi.Task):
params = luigi.Parameter() # Arbitrary JSON

def output(self):
return luigi.LocalTarget('./config.json')

def run(self):
with self.output().open('w') as f:
json.dump(params, f)

class Task1(luigi.Task):
stuff = luigi.Parameter()

def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[:6]))

def run(self):
with self.output().open('w') as f:
f.write(self.stuff)


class Task2(luigi.Task):
stuff = luigi.Parameter()
params = luigi.Parameter()


def output(self):
return luigi.LocalTarget('{}'.format(self.stuff[6:]))

def run(self):

config = Parameterizer(params=self.params)
yield config

with config.output().open() as f:
parameters = json.load(f)

if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass
with self.output().open('w') as f:
f.write(self.stuff)

if __name__ == '__main__':
cf_json = '{"runTask1": True}'

print("Trying to run with Task1...")
luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)

time.sleep(10)

cf_json = '{"runTask1": False}'

print("Trying to run WITHOUT Task1...")
luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)

(只需调用python tasks.py即可执行)

我们可以很容易地想象将多个参数映射到多个任务,或者在允许执行各种任务之前应用自定义测试。我们还可以重写它以从 luigi.Config 中获取参数。

另请注意 Task2 中的以下控制流:

    if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
pass

在这里我们可以运行替代任务,或者动态调用任务,正如我们在 luigi 存储库的示例中看到的那样。例如:

    if parameters["runTask1"]:
yield Task1(stuff=self.stuff)
else:
# self.stuff is not automatically parsed to int, so this list comp is valid
data_dependent_deps = [Task1(stuff=x) for x in self.stuff]
yield data_dependent_deps

这可能比简单的 run_standalone() 方法复杂一点,但我认为它最接近您在记录的 luigi 模式中寻找的东西。

来源:https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies

关于python - Luigi - 重写任务需要/输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49987595/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com