gpt4 book ai didi

airflow - 在 Airflow 中使用参数的示例?

转载 作者:行者123 更新时间:2023-12-05 01:19:09 27 4
gpt4 key购买 nike

我下载的文件经常在文件名中包含日期。

csat_surveys_2017_03_05.csv
03062017_roster.csv

我的代码单独处理这个问题。

  • 将已处理文件列表中的日期(基于切片文件名)与应该存在的预期日期(到当前日期的某个日期范围)进行比较
  • 对于我处理的每个文件,将文件名添加到数据库表中,并且只处理尚未添加到该表中的新文件

我可以(并且应该)使用 Airflow 计划日期来代替必须编写此逻辑的需要吗?每天,我的任务都安排运行。我采用该预定日期(也许减去 1 天)并将该值用作参数以作为要读取的文件名的一部分传递(在 pandas 中)。如果是这样,我可以看一个清晰的示例作为模板吗?

这是更好的方法吗,如果文件丢失或延迟几天,它会覆盖我吗(我希望任务失败,然后每天继续尝试直到它成功或者直到我注意到它并且可以提高给我们的客户的问题)?

最佳答案

我会说是的,使用 execution_date 可能是最佳实践。

要访问它,您需要一个模板化字段。一些默认的运算符已经有了这些,或者您可能想创建自己的运算符,它看起来像这样:

在您的 DAG 中,您的任务是:

my_task = MyOperator(
task_id='t1',
filename='prefix_{{ ds }}_suffix')

ds 是 airflow 宏,用于访问 execution_date 参数作为日期的字符串表示形式。

你的 MyOperator 看起来像:

class MyOperator(BaseOperator):
template_fields = ('filename')

def __init__(self, filename)
self.filename = filename

def execute(self, context):
download_file(self.filename)
do_other_stuff()

您可以在宏部分找到更多关于如何参数化任务的信息 https://airflow.incubator.apache.org/code.html#macros

关于airflow - 在 Airflow 中使用参数的示例?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42678533/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com