gpt4 book ai didi

python - luigi 依赖项在运行时发生变化

转载 作者:太空宇宙 更新时间:2023-11-04 05:13:39 25 4
gpt4 key购买 nike

我有一个 luigi将我的原始数据拆分成较小文件的预处理任务。然后这些文件将由实际管道处理。

所以关于参数,我想要求每个管道都有一个预处理文件 ID 作为参数。但是,此文件 ID 仅在预处理步骤中生成,因此仅在运行时才知道。为了说明我的想法,我提供了这个不起作用的代码:

import luigi
import subprocess
import random


class GenPipelineFiles(luigi.Task):

input_file = luigi.Parameter()

def requires(self):
pass

def output(self):

for i in range(random.randint(0,10)):
yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i))

def run(self):

for iout in self.output:
command = "touch {}".format(iout.fname)
subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):
pass


class Experiment(luigi.WrapperTask):

input_file = luigi.Parameter(default="ex")

def requires(self):

file_ids = GenPipelineFiles(input_file=self.input_file)

for file_id in file_ids:
yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id)


luigi.run()

包装器任务 Experiment应该

  1. 首先,以某种方式需要将原始数据拆分为文档

  2. 其次,要求实际流水线带有预处理得到的文件id。

GenPipelineFiles 中输出文件的随机数表示这不能硬编码到 Experiment 中的 requires .

一个可能与这个问题相关的问题是,luigi任务正确地只有一个输入目标和一个输出目标。可能是关于如何在 GenPipelineFiles 中对多个输出建模的注释也可以解决问题。

最佳答案

处理多个输出的一种简单方法是创建一个以输入文件命名的目录,并将拆分的输出文件放入以输入文件命名的目录中。这样依赖任务就可以检查目录是否存在。假设我有一个输入文件 123.txt,然后我用文件 1.txt、2.txt、3.txt 创建一个目录 123_split 作为 GenPipelineFiles 的输出,然后创建一个目录 123_processed 和 1 .txt、2.txt、3.txt 作为 RunPipelineOnSmallChunk 的输出。

对于 Experiment 中的 requires 方法,您必须返回要运行的任务,例如在列表中。您编写 file_ids = GenPipelineFiles(input_file=self.input_file) 的方式让我觉得该对象的 run 方法没有被调用,因为它没有被该方法返回。

这里有一些示例代码,可以在每个文件的基础上处理目标(但不是每个文件的任务)。我仍然认为使用目录的单个输出目标或某种类型的哨兵文件来指示您已完成会更安全。除非任务确保创建每个目标,否则原子性就会丢失。

PYTHONPATH=. luigi --module sampletask RunPipelineOnSmallChunk --local-scheduler

示例任务.py

import luigi
import os
import subprocess
import random


class GenPipelineFiles(luigi.Task):

inputfile = luigi.Parameter()
num_targets = random.randint(0,10)

def requires(self):
pass

def get_prefix(self):
return self.inputfile.split(".")[0]

def get_dir(self):
return "split_{}".format(self.get_prefix())

def output(self):
targets = []
for i in range(self.num_targets):
targets.append(luigi.LocalTarget(" {}/{}_{}.txt".format(self.get_dir(), self.get_prefix(), i)))
return targets

def run(self):
if not os.path.exists(self.get_dir()):
os.makedirs(self.get_dir())
for iout in self.output():
command = "touch {}".format(iout.path)
subprocess.call(command, shell=True)


class RunPipelineOnSmallChunk(luigi.Task):

inputfile = luigi.Parameter(default="test")

def get_prefix(self):
return self.inputfile.split(".")[0]

def get_dir(self):
return "processed_{}".format(self.get_prefix())

@staticmethod
def clean_input_path(path):
return path.replace("split", "processed")

def requires(self):
return GenPipelineFiles(self.inputfile)

def output(self):
targets = []
for target in self.input():
targets.append(luigi.LocalTarget(RunPipelineOnSmallChunk.clean_input_path(target.path)))
return targets

def run(self):
if not os.path.exists(self.get_dir()):
os.makedirs(self.get_dir())
for iout in self.output():
command = "touch {}".format(iout.path)
subprocess.call(command, shell=True)

关于python - luigi 依赖项在运行时发生变化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42365286/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com