gpt4 book ai didi

python - Airflow - 从 XCOM 创建动态任务

转载 作者:太空宇宙 更新时间:2023-11-03 11:13:46 25 4
gpt4 key购买 nike

我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我正在存储一个列表,我想使用列表中的每个元素来动态创建下游任务。

我的用例是我有一个上游运算符(operator)检查文件的 sftp 服务器并返回符合特定条件的文件名列表。我想为每个返回的文件名创建动态下游任务。

我已将其简化为以下内容,虽然它有效,但我觉得它不是惯用的 Airflow 解决方案。在我的用例中,我会编写一个从 python 运算符调用的 python 函数,该函数从 xcom 中提取值并返回它,而不是使用 pusher 函数。

我知道虽然我可以创建一个结合两者的自定义运算符,但我不认为创建一次性运算符是好的做法,我希望有另一种解决方案。

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
"owner": "test",
"depends_on_past": False,
"start_date": datetime(2018, 10, 27),
"email": ["test@mctest.com"],
"email_on_failure": False,
"email_on_retry": False,
"email_on_success": False,
"retries": 0,
"provide_context": True
}

dag = DAG("test", default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
task_id='pusher_task',
dag=dag,
python_callable=pusher
)

def bash_wrapper(task, **context):
return BashOperator(
task_id='dynamic'+task,
dag=dag,
bash_command='date'
)

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end

最佳答案

我不会做你想要实现的主要是因为:

  1. XCOM 值是在运行时
  2. 中生成的状态
  3. DAG 结构是在解析时间
  4. 中确定的

即使您使用类似以下内容来访问某些上游任务生成的 XCOM 值:

from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
execution_date = dag.previous_schedule(datetime.now())

// Find previous task instance:
ti = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.task_id == upstream_task_id).first()
if ti:
files_list = ti.xcom_pull()
if files_list:
return files_list
// Return default state:
return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
...
xcom_push=True,
dag=dag)

但这会表现得很奇怪,因为 DAG 解析和任务执行没有按照您希望的方式同步。

如果你想这样做的主要原因是并行文件处理,我会有一些静态数量的处理任务(由所需的并行度决定)从上游任务的 XCOM 值读取文件列表并对相关部分进行操作那个 list 。

另一种选择是使用 Apache Spark 等分布式计算框架来并行处理文件。

关于python - Airflow - 从 XCOM 创建动态任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55672724/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com