我使用 Luigi Orchestrator 编写了以下 Python 代码。
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def requires(self):
return []
def run(self):
...
我想在我的 run()
函数中使用日期参数。问题是我不知道它是什么类型。在文档中,这个参数似乎是一个datetime.date
,所以我应该能够使用self.date.strftime()
方法。但此方法不适用于 DateParameters
。
我的问题是:
您的代码不完整,但我猜其余部分如下所示。您一定在某个地方有错误,因为它有效:DateParameter
返回一个 python 日期值。请参阅luigi source code for details .
我的tasks/foo.py
:
from datetime import date, timedelta
import luigi
class AggregateArtists(luigi.Task):
date = luigi.DateParameter(default=date.today() - timedelta(days=1))
def output(self):
return luigi.LocalTarget("/tmp/foobar.txt")
def run(self):
with self.output().open('w') as out_file:
out_file.write(self.date.strftime("%Y%m%d") + "\n")
if __name__ == "__main__":
luigi.run()
运行任务:
$ python tasks/foo.py AggregateArtists --local-scheduler
DEBUG: Checking if AggregateArtists(date=2015-12-03) is complete
INFO: Scheduled AggregateArtists(date=2015-12-03) (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) running AggregateArtists(date=2015-12-03)
INFO: [pid 21831] Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) done AggregateArtists(date=2015-12-03)
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=179482616, workers=1, host=matagus-laptop, username=matagus, pid=21831) was stopped. Shutting down Keep-Alive thread
打印输出文件的内容:
$ cat /tmp/foobar.txt
20151203
我是一名优秀的程序员,十分优秀!