gpt4 book ai didi

python - 如何运行 Luigi Pipeline 的并行实例 : Pid set already running

转载 作者:行者123 更新时间:2023-12-01 02:48:39 29 4
gpt4 key购买 nike

我有一个简单的管道。

我想使用 ID 2381 启动一次,然后在第一个作业运行时,我想使用 ID 231 启动第二次运行。第一次运行按预期完成。

第二次运行返回此响应

Pid(s) set([10362]) already running
Process finished with exit code 0

我开始这样运行

运行一个:

luigi.run(
cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
main_task_cls=TaskTwo()
)

运行两个:

luigi.run(
cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
main_task_cls=TaskTwo()
)

每个任务都有一个由 luigi 的 task_id_str(...) 方法生成的唯一 ID。为什么 luigi.paramater、TaskTwo-id 和 MockTarget 文件都不同时,luigi 认为任务已经在运行?

管道代码:

import time
import uuid
from luigi.mock import MockTarget
import luigi


class TaskOne(luigi.Task):
run_id = luigi.Parameter()

def output(self):
return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)

def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()


class TaskTwo(luigi.Task):
id = luigi.Parameter(default=uuid.uuid4().__str__())

def output(self):
return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)

def requires(self):
return TaskOne(self.id)

def run(self):
_out = self.output().open('w')
time.sleep(10)
_out.write(u"Hello World!\n")
_out.close()

最佳答案

看起来这可能是因为您没有连接到调度程序服务器,因此它尝试启动调度程序进程两次。你的头脑清醒吗?

我能够让您的代码在命令行中运行,如下所示。首先,我创建了一个目录并将代码放入名为 luigitest.py 的文件中(减去 luigi.run() 命令)。我将目录更改为我创建的目录。然后我跑了:

luigid --background --pidfile ./luigid.pid --logdir . --state-path .

然后我在同一目录中打开了第二个终端。在我跑的第一个中:

PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13822 --TaskTwo-id 2381 --local-scheduler

在我跑的第二个中(大约一秒钟后):

PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13823 --TaskTwo-id 2382 --local-scheduler

它们都输出“Hello World!”

关于python - 如何运行 Luigi Pipeline 的并行实例 : Pid set already running,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45034961/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com