gpt4 book ai didi

python - Airflow:如何从 PythonOperator 中的 python_callable 内部创建子运算符

转载 作者:行者123 更新时间:2023-12-04 00:58:36 25 4
gpt4 key购买 nike

我有一个简单的 python 运算符,定义如下:

loop_records = PythonOperator(
task_id = 'loop_records',
provide_context = True,
python_callable = loop_topic_records,
dag = dag
)

这个 python 运算符调用 loop_topic_records ,定义如下:
def loop_topic_records(**context):
parent_dag = context['dag']
for i in range(3):
op = DummyOperator(
task_id="child_" + str(i),
dag=parent_dag
)
logging.info('Child operator ' + str(i))
loop_records >> op

我看到代码没有引发任何错误。它甚至在日志中打印 Child operator 0..2。但是,在 dag Graph view 中我没有看到子操作符,我只看到 loop_records 节点,就好像我的 dag 只包含一个操作符一样。那么,这有什么问题呢?我该如何解决?

最佳答案

你不能为所欲为。每个 DAG 一旦被 Airflow 加载,就是静态的,并且不能从正在运行的任务中改变。您从任务内部对 DAG 所做的任何更改都将被忽略。

你可以做的是,使用 airflow_multi_dagrun plugin 提供的多 DAG 运行运算符启动其他 DAG;创建 DAG 的 DAG,可以这么说:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator

def gen_topic_records(**context):
for i in range(3):
# generate `DagRunOrder` objects to pass a payload (configuration)
# to the new DAG runs.
yield DagRunOrder(payload={"child_id": i})
logging.info('Triggering topic_record_dag #%d', i)

loop_topic_record_dags = TriggerMultiDagRunOperator(
task_id='loop_topic_record_dags',
dag=dag,
trigger_dag_id='topic_record_dag',
python_callable=gen_topic_records,
)

以上将触发一个名为 topic_record_dag 的 DAG 启动,3 次。在该 DAG 中的运算符内部,您可以通过 dag_run.conf 对象(在模板中)或 context['dag_run'].conf 引用(在 PythonOperator() 代码中,设置 provide_context=True )访问任何设置为有效负载的内容。

如果在完成这 3 个 DAG 后您需要做其他工作,您只需要在上述 DAG 中添加一个传感器即可。传感器是等待特定外部信息可用的运算符(operator)。在这里使用一个在所有子 DAG 完成时触发的。同一个插件有一个 MultiDagRunSensor 这正是你在这里需要的,当所有由 TriggerMultiDagRunOperator 任务启动的 DAG 完成(成功或失败)时,它会触发:
from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor

wait_for_topic_record_dags = MultiDagRunSensor(
task_id='wait_for_topic_record_dags',
dag=dag
)

loop_topic_record_dags >> wait_for_topic_record_dags

然后在该传感器之后放置更多运算符(operator)。

关于python - Airflow:如何从 PythonOperator 中的 python_callable 内部创建子运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60581074/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com