gpt4 book ai didi

airflow - 运行时添加到DAG的任务无法调度

转载 作者:行者123 更新时间:2023-12-02 03:53:58 24 4
gpt4 key购买 nike

我的想法是有一个任务 foo 生成输入列表(用户、报告、日志文件等),并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

所以,理想情况下,我的 DAG 应该是这样的: enter image description here

这里唯一的变量是生成的任务数量。我想在所有这些完成后执行更多任务,因此为每个任务创建一个新的 DAG 似乎不合适。

这是我的代码:

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)

def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))

for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)

op.set_downstream(bar_operator)
op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op = DummyOperator(
task_id='dummy',
dag=dag
)

dummy_op.set_upstream(gen_subdag_node_op)

bar_operator = DummyOperator(
task_id='bar',
dag=dag)

bar_operator.set_upstream(dummy_op)

在日志中,我可以看到 gen_nodes 已正确执行(即 Iterateworker 5 等)。然而,新任务并未被调度,也没有证据表明它们已被执行。

我在网上找到了相关的代码示例,such as this ,但无法使其发挥作用。我错过了什么吗?

或者,是否有更合适的方法来解决这个问题(隔离工作单元)?

最佳答案

目前,airflow 不支持在 dag 运行时添加/删除任务。

工作流顺序将是 dag 运行开始时评估的任何内容。

See the second paragraph here.

这意味着您无法根据运行中发生的情况添加/删除任务。您可以根据与运行无关的内容在 for 循环中添加 X 任务,但运行开始后不会更改工作流形状/顺序。

很多时候,您可以使用 BranchPythonOperator 在 dag 运行期间做出决定(这些决定可以基于您的 xcom 值),但它们必须是决定继续工作流程中已存在的分支。

Dag 运行,Dag 定义在 Airflow 中以不完全直观的方式分离,但或多或​​少在 dag 运行中创建/生成的任何内容(xcomdag_run .conf 等)不可用于定义 dag 本身。

关于airflow - 运行时添加到DAG的任务无法调度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44626883/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com