gpt4 book ai didi

Python Luigi - 满意时继续执行外部任务

转载 作者:太空狗 更新时间:2023-10-30 02:57:11 28 4
gpt4 key购买 nike

我正在研究一个 Luigi 管道,它检查是否存在手动创建的文件,如果存在,则继续执行下一个任务:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
task_namespace='MyTask'
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

class ProcessExternalFile(luigi.Task):
task_namespace='MyTask'
path = luigi.Parameter()

def requires(self):
return ExternalFileChecker(path=self.path)

def output(self):
dirname = self.path
outfile = os.path.join(dirname, 'processedfile.txt')
return luigi.LocalTarget(outfile)

def run(self):
#do processing

if __name__ == '__main__':
path = r'D:\MyPath\luigi'
luigi.run(['MyTask.ProcessExternalFile','--path', path,\
'--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
'--worker-keep-alive'])

我想要的是 luigi 在我创建手动文件并将其粘贴到路径后继续。当我这样做时,它没有找到文件并继续执行任务,而是每隔几秒重新检查一次新任务:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

经过相当长的时间(15-20 分钟左右)后,luigi 将找到该文件,然后可以按需要继续。我能做些什么来防止这种延迟?我希望 luigi 在文件存在后立即继续。

最佳答案

需要注意的几点:

  1. Luigi 工作线程不会退出,直到至少有一个任务在运行(或者如果 keep_alive = True,在这种情况下,它会在没有更多待处理任务时退出)。
  2. 有失败任务的重试逻辑,默认重试间隔为15分钟。
  3. 重试逻辑的工作原理如下。在指定的重试间隔后,调度程序将忘记任务的失败(与单击 UI 中的“原谅失败”按钮相同),并将任务的状态更改为挂起。下次工作人员向调度程序请求工作时,可以将此任务分配给工作人员。
  4. 未完成的外部任务计为失败,取决于重试逻辑。
  5. 外部任务的重试行为由 [worker] 部分中的 retry_external_tasks 配置设置控制。

我认为您正在观察的是这样的东西。您的管道正在运行,任务 ProcessExternalFile 失败,然后您添加文件,任务在 retry_delay 期间保持 FAILED,最后变为 PENDING 并给工作人员再次执行此任务,此时它会发现文件并且任务完成。

这是否是期望的行为取决于您。如果您希望更快地找到文件,您可以更改重试间隔。或者您可以在 run 方法中执行无限的 while 循环并定期检查文件,并在找到时跳出循环。您还可以配置 Luigi 以完全禁用重试逻辑。

关于Python Luigi - 满意时继续执行外部任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37988423/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com