gpt4 book ai didi

python - 要求在 Ruffus 管道中运行函数之前创建一组文件

转载 作者:行者123 更新时间:2023-12-01 06:17:25 25 4
gpt4 key购买 nike

我正在使用 ruffus 编写管道。我有一个被并行调用多次的函数,它创建了多个文件。我想创建一个函数“combineFiles()”,在创建所有这些文件后调用该函数。由于它们在集群上并行运行,因此它们不会一起完成。我编写了一个函数“getFilenames()”,它返回需要创建的文件名集,但如何使 mergeFiles() 等待它们出现?

我尝试了以下方法:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
# I should only be called if every file in the list 'filenames' exists

我也尝试过装饰器:

@merge(getFilenames)

但这也不起作用。在创建 getFilenames 给出的文件之前,combineFiles 仍然会被错误地调用。如何使组合文件以这些文件存在为条件?

谢谢。

最佳答案

我是 Ruffus 的开发者。我不确定我完全理解你想要做什么,但这里是:

等待需要不同时间完成的作业才能运行管道的下一阶段正是 Ruffus 的目的,因此希望这很简单。

第一个问题是您是否知道预先创建了哪些文件,即在管道运行之前?让我们首先假设您这样做。

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

让我们编写一个虚拟函数,每次调用它时都会创建一个文件。在 Ruffus 中,任何输入和输出文件名分别包含在前两个参数中。我们没有输入文件名,因此我们的函数调用应如下所示:

create_file(None, "one.file")
create_file(None, "two.file")
create_file(None, "three.file")

create_file 的定义如下所示:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
open(output_file_name, "w").write("dummy file")

每个文件都将通过 3 次单独的 create_file 调用来创建。如果您愿意,这些可以并行运行。

pipeline_run([create_file], multiprocess = 5)

现在合并文件。 “@Merge”装饰器确实就是为此而设置的。我们只需要将它链接到前面的函数即可:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())

只有在三次调用 create_file() 中所有文件都准备就绪时,才会调用 merge_file。

整个代码如下:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
# simulate create file process of indeterminate complexity
sleep(randint(1,5))
open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
output_file = open(output_file_name, "w")
for i in input_file_names:
output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

这就是结果:

>>> pipeline_run([merge_file], multiprocess = 5)

Job = [None -> two.file] completed
Job = [None -> three.file] completed
Job = [None -> one.file] completed
Completed Task = create_file
Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file

关于python - 要求在 Ruffus 管道中运行函数之前创建一组文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2465953/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com