gpt4 book ai didi

Airflow 将 jinja 模板作为字符串

转载 作者:行者123 更新时间:2023-12-05 06:29:27 24 4
gpt4 key购买 nike

在 Airflow 中,我试图在 Airflow 中使用 jinja 模板,但问题是它没有被解析,而是被视为一个字符串。请看我的代码``

from datetime import datetime

from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

def test_method(dag,network_id,schema_name):
print "Schema_name in test_method", schema_name
third_task = PythonOperator(
task_id='first_task_' + network_id,
provide_context=True,
python_callable=print_context2,
dag=dag)
return third_task

dag = DAG('testing_xcoms_pull', description='Testing Xcoms',
schedule_interval='0 12 * * *',
start_date= datetime.today(),
catchup=False)


def print_context(ds, **kwargs):
return 'Returning from print_context'

def print_context2(ds, **kwargs):
return 'Returning from print_context2'

def get_schema(ds, **kwargs):
# Returning schema name based on network_id
schema_name = "my_schema"
return get_schema

first_task = PythonOperator(
task_id='first_task',
provide_context=True,
python_callable=print_context,
dag=dag)


second_task = PythonOperator(
task_id='second_task',
provide_context=True,
python_callable=get_schema,
dag=dag)

network_id = '{{ dag_run.conf["network_id"]}}'

first_task >> second_task >> test_method(
dag=dag,
network_id=network_id,
schema_name='{{ ti.xcom_pull("second_task")}}')

``

Dag 创建失败,因为 '{{ dag_run.conf["network_id"]}}' 被 Airflow 视为字符串。谁能帮我解决我代码中的问题???

最佳答案

Airflow 运营商有一个名为 template_fields 的变量。这个变量通常在运算符类的顶部声明,查看 github 代码库中的任何运算符。

如果您尝试将 Jinja 模板语法传递到的字段不在 template_fields 列表中,则 Jinja 语法将显示为字符串。

关于Airflow 将 jinja 模板作为字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53476510/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com