gpt4 book ai didi

python - 在 TriggerDagRunOperator 中提供上下文

转载 作者:行者123 更新时间:2023-12-03 23:37:23 39 4
gpt4 key购买 nike

我有一个由另一个 dag 触发的 dag。我已经通过 DagRunOrder().payload 将一些配置变量传递给了这个 dag。字典以同样的方式official example已经完成了。

现在在这个 dag 我有另一个 TriggerDagRunOperator启动第二个 dag 并希望通过这些相同的配置变量。

我已成功访问 PythonOperator 中的有效负载变量像这样:

def run_this_func(ds, **kwargs):
print("Remotely received value of {} for message and {} for day".format(
kwargs["dag_run"].conf["message"], kwargs["dag_run"].conf["day"])
)

run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag
)

但同样的模式在 TriggerDagRunOperator 中不起作用:
def trigger(context, dag_run_obj, **kwargs):
dag_run_obj.payload = {
"message": kwargs["dag_run"].conf["message"],
"day": kwargs["dag_run"].conf["day"]
}
return dag_run_obj

trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
provide_context=True,
python_callable=trigger,
dag=dag
)

它会产生关于使用 provide_context 的警告。 :
INFO - Subtask: /usr/local/lib/python2.7/dist-packages/airflow/models.py:1927: PendingDeprecationWarning: Invalid arguments were passed to TriggerDagRunOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
INFO - Subtask: *args: ()
INFO - Subtask: **kwargs: {'provide_context': True}
INFO - Subtask: category=PendingDeprecationWarning

这个错误表明我没有通过 conf :
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
INFO - Subtask: result = task_copy.execute(context=context)
INFO - Subtask: File "/usr/local/lib/python2.7/dist-packages/airflow/operators/dagrun_operator.py", line 64, in execute
INFO - Subtask: dro = self.python_callable(context, dro)
INFO - Subtask: File "/home/user/airflow/dags/dummy_responses.py", line 28, in trigger
INFO - Subtask: "message": kwargs["dag_run"].conf["message"],
INFO - Subtask: KeyError: 'dag_run'

我尝试过但也没有奏效的第二种模式是使用 params像这样的论点:
def trigger(context, dag_run_obj):
dag_run_obj.payload = {
"message": context['params']['message'],
"day": context['params']['day']
}
return dag_run_obj

trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
python_callable=trigger,
params={
"message": "{{ dag_run.conf['message'] }}",
"day": "{{ dag_run.conf['day'] }}"
},
dag=dag
)

这种模式不会产生错误,而是将参数作为字符串传递到下一个 dag,即它不计算表达式。

如何访问 TriggerDagRunOperator 中的配置变量第二个 dag 吗?

最佳答案

解决了:
dag_run对象存储在上下文中,因此可以在 python_callable 中访问配置变量。的TriggerDagRunOperator使用这种模式:

def trigger(context, dag_run_obj):
dag_run_obj.payload = {
"message": context["dag_run"].conf["message"],
"day": context["dag_run"].conf["day"]
}
return dag_run_obj

trigger_step = TriggerDagRunOperator(
task_id="trigger_modelling",
trigger_dag_id="Dummy_Modelling",
python_callable=trigger,
dag=dag
)

关于python - 在 TriggerDagRunOperator 中提供上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48746561/

39 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com