gpt4 book ai didi

airflow - Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date

转载 作者:行者123 更新时间:2023-12-01 06:49:35 27 4
gpt4 key购买 nike

我想在触发器 DAG 中设置 execution_date。我正在使用操作符 TriggerDagRunOperator,这个操作符有参数 execution_date,我想设置当前的 execution_date。

def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
pp = pprint.PrettyPrinter(indent=4)
c_p = Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1"
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if Variable.get("VAR2") == Variable.get("VAR1") and Variable.get("VAR3") == "1":
pp.pprint(dag_run_obj.payload)
return dag_run_obj

default_args = {
'owner': 'pepito',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['xxxx@yyyyy.net'],
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(minutes=1)
}

dag = DAG(
'DAG_1',
default_args=default_args,
schedule_interval="0 12 * * 1",
dagrun_timeout=timedelta(hours=22),
max_active_runs=1,
catchup=False
)

trigger_dag_2 = TriggerDagRunOperator(
task_id='trigger_dag_2',
trigger_dag_id="DAG_2",
python_callable=conditionally_trigger,
execution_date={{ execution_date }},
dag=dag,
pool='a_roz'
)

但我得到了下一个错误

name 'execution_date' is not defined



如果我设置
execution_date={{ 'execution_date' }},

或者
execution_date='{{ execution_date }}',

我得到

Traceback (most recent call last):

File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task

result = task_copy.execute(context=context)

File "/usr/local/lib/python3.6/site-packages/airflow/operators/dagrun_operator.py", line 78, in execute

replace_microseconds=False)

File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 98, in trigger_dag

replace_microseconds=replace_microseconds,

File "/usr/local/lib/python3.6/site-packages/airflow/api/common/experimental/trigger_dag.py", line 45, in _trigger_dag

assert timezone.is_localized(execution_date)

File "/usr/local/lib/python3.6/site-packages/airflow/utils/timezone.py", line 38, in is_localized

return value.utcoffset() is not None

AttributeError: 'str' object has no attribute 'utcoffset'



如果我想等于 DAG_1,有谁知道如何设置 DAG_2 的执行日期?

这个问题不同于 airflow TriggerDagRunOperator how to change the execution date因为在这篇文章中没有解释如何通过操作符TriggerDagRunOperator发送execution_date,只是说存在这种可能性。 https://stackoverflow.com/a/49442868/10269204

最佳答案

它以前没有模板化,但现在模板化了 commit

您可以使用新版本的 Airflow 尝试您的代码

另外对于硬编码的 execution_date,您需要设置 tzinfo:

from datetime import datetime, timezone
execution_date=datetime(2019, 3, 27, tzinfo=timezone.utc)
# or:
execution_date=datetime.now().replace(tzinfo=timezone.utc)

关于airflow - Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53778769/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com