gpt4 book ai didi

airflow - 在 airflow 2.0 taskflow API 中定义复杂的工作流依赖

转载 作者:行者123 更新时间:2023-12-05 06:49:36 31 4
gpt4 key购买 nike

假设我有如下定义的虚拟 DAG:

@dag(default_args=default_args,
schedule_interval=None,
start_date=days_ago(2))
def airflow_taskflow_api_dag():
cur_day = '2020-01-01'

@task()
def A(current_day: str):
return current_day + ' 10'

@task(multiple_outputs=True)
def B(current_date_time: str):
timestamp = calendar.timegm(
datetime.strptime(
current_date_time, '%Y-%m-%d %H').timetuple())

another_current_date_time_format = '2020-01-01T10:00:00'

return {
"timestamp": timestamp,
"new_cur_date_time": another_current_date_time_format
}

@task()
def C(livy_batch_id, start_time: str):
logging.info("Executing task C {}".format(start_time))
return "c_id"

@task()
def D(upstream_id, start_time: str):
logging.info("Executing task D {}".format(start_time))

# workflow dependencies
get_current_time = A(cur_day)
new_date_time = B(get_current_time)

livy_task = LivyOperator(
task_id='livy_task',
file="dummy_file",
class_name="dummy_class",
args=[
new_date_time['timestamp']
],
driver_memory='1G',
driver_cores='4',
executor_memory='1G',
executor_cores='4',
num_executors='4',
queue='default_queue',
livy_conn_id='livy_conn_id'
)

c_task = C(livy_task.output, new_date_time['new_cur_date_time'])

d_task = D(c_task, new_date_time['new_cur_date_time'])

airflow_taskflow_api_dag = airflow_taskflow_api_dag()

如果我这样写我的 DAG(从上游任务传递两个参数),依赖关系将是这样的: Screen Shot 2021-03-11 at 3 17 37 PM

但我想定义依赖关系:A >> B >> livy_task >> C >> D

有没有办法通过使用 taskflow api 来做到这一点?似乎如果我将任务 B 的输出传递给 livy_task、C 和 D,这三个任务将并行运行。

谢谢!

最佳答案

您当前的依赖关系图可以用 Airflow 1.10.* 语法表示为:

A >> B >> [C, D, livy_task]
livy_task >> C >> D

But I want to define the dependency with: A >> B >> livy_task >> C >> D

根据这个similar post , 不可能删除此依赖图中的现有边,同时保留现有运算符。

原因:
对于 B >> D,存在依赖关系,因为 B 的输出 new_date_time 的输入之一D.

(目前)还没有办法移除隐藏在 UI 中的那些隐式边缘,但它们不应该影响您的执行流程,因为默认情况下 trigger_rule 是 all_success

补充说明:
如果您想添加更多边,那很简单,因为 Taskflow API 支持使用按位运算符设置依赖关系(即 livy_task >> D)

关于airflow - 在 airflow 2.0 taskflow API 中定义复杂的工作流依赖,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66589763/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com