gpt4 book ai didi

python - 如何在 Luigi 中使用早期任务的输出?

转载 作者:行者123 更新时间:2023-12-05 07:42:36 28 4
gpt4 key购买 nike

我正在编写一个管道,其中后面的任务需要读取前面任务的输出,这样他们就可以知道他们需要在他们的要求中传递哪些参数。

我在下面创建了一个简化的设置示例。

import random
import pickle
import luigi


class WriteNumbers(luigi.Task):

def requires(self):
pass

def run(self):
# pickle a random list of ints 1-100
numbers = [random.randint(1, 100) for _ in range(100)]
pickle.dump(numbers, open("./numbers.pkl", 'wb'))

def output(self):
return luigi.LocalTarget("./numbers.pkl")


class SquareNumber(luigi.Task):
number = luigi.IntParameter()

def requires(self):
pass

def run(self):
# given a number as the parameter, write a file containing its square
with open("./squared_{}".format(self.number), 'w') as f:
f.write(str(self.number ** 2))

def output(self):
return luigi.LocalTarget("./squared_{}".format(self.number))


class SquareAll(luigi.WrapperTask):

def requires(self):
yield WriteNumbers() # require the number list to be pickled first
numbers = pickle.load("./numbers.pkl") # load the number list
for n in numbers: # square each number in the number list
yield SquareNumber(number=n)

class CubeNumber(luigi.Task):
number = luigi.IntParameter()

def requires(self):
pass

def run(self):
# given a number as the parameter, write a file containing its cube
with open("./cubed_{}".format(self.number), 'w') as f:
f.write(str(self.number ** 3))

def output(self):
return luigi.LocalTarget("./cubed_{}".format(self.number))


class CubeAll(luigi.WrapperTask):

def requires(self):
yield WriteNumbers() # require the number list to be pickled first
numbers = pickle.load("./numbers.pkl") # load the number list
for n in numbers: # square each number in the number list
yield CubeNumber(number=n)

class CrunchNumbers(luigi.WrapperTask):
def requires(self):
yield SquareAll()
yield CubeAll()

if __name__ == '__main__':
luigi.run()

当通过 python luigi_example.py CrunchNumbers 运行时,将创建 100 个随机数并将列表 pickle 并转储到磁盘。 SquareAll 加载该 pickled 列表并使用它来请求具有所需参数的 SquareNumber 任务。 CubeAll 为其类似任务引用相同的结果文件。

问题在于,运行时会抛出异常,因为 numbers.pkl 文件尚不存在。

我怎样才能让后面的任务根据前面任务的输出生成依赖项?我在这里使用随机数来表示无法提前知道输出:我的实际应用程序正在处理来自 API 的数据。

最佳答案

您正在使用 Dynamic Dependencies并且这些需要从 run 方法调用(当 requires 的结果可用作 input 时),所以 CubeAllSquareAll 的结构应如下所示:

class SquareAll(luigi.WrapperTask):

def requires(self):
yield WriteNumbers() # require the number list to be pickled first

def run(self):
numbers_file = self.input()[0].path
numbers = pickle.load(numbers_file) # load the number list
for n in numbers: # square each number in the number list
yield SquareNumber(number=n)

关于python - 如何在 Luigi 中使用早期任务的输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44359372/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com