gpt4 book ai didi

python - Airflow 自定义 jinja2 过滤器

转载 作者:太空狗 更新时间:2023-10-30 01:32:05 24 4
gpt4 key购买 nike

我正在尝试为我的 airflow jinja2 模板添加自定义过滤器。

因为我在 S3 中的文件夹是这样的

/year/month/day/

,我的目的是像这样在变量屏幕中使用 yesterday_ds:

s3://logs.web.com/AWSLogs/{{ yesterday_ds | get_year }}/{{ yesterday_ds | get_month }}/{{ yesterday_ds | get_day }}/

我在一个 PR(我认为它已经合并..)中看到你可以在 dag 对象创建的 dag_args 参数中使用参数 'user_defined_filters' 来做到这一点 here

问题是,即使在执行此操作时,它也会显示“没有名为 get_year 的过滤器”。

这是我的代码:

dag.py

   dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)

实用程序.py

def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['blabla@blabla.com']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}


def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)

有没有人看出我错在哪里?谢谢!

编辑

不幸的是,在 dag 定义之后打印这个,没有显示自定义过滤器:(。

jinja_env = dag.get_template_env()
print(jinja_env.filters)

此外,如果我尝试将其直接添加为 DAG 对象参数,如测试 @tests/models.py 中所示:

Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'

编辑 2

好的,我看到的是我的版本 1.8.0 而这个版本没有过滤器。有人知道如何通过 pip 下载 1.8.2rc 吗?或者我们不能?

最佳答案

Airflow 现在支持自定义过滤器和宏

工作代码示例:

from airflow import DAG
from datetime import datetime, timedelta

def first_day_of_month(any_day):
return any_day.replace(day=1)


def last_day_of_month(any_day):
next_month = any_day.replace(day=28) + timedelta(days=4) # this will never fail
return next_month - timedelta(days=next_month.day)


def isoformat_month(any_date):
return any_date.strftime("%Y-%m")


with DAG(
dag_id='generate_raw_logs',
default_args=default_args,
schedule_interval=timedelta(minutes=120),
catchup=False,
user_defined_macros={
'first_day_of_month': first_day_of_month,
'last_day_of_month': last_day_of_month,
},
user_defined_filters={
'isoformat_month': isoformat_month
}
)

关于python - Airflow 自定义 jinja2 过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45585815/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com