gpt4 book ai didi

python - 如何在单元测试中测试 Airflow dag?

转载 作者:太空狗 更新时间:2023-10-29 21:41:28 53 4
gpt4 key购买 nike

我正在尝试在测试环境中测试具有多个任务的 dag。我能够测试与 dag 关联的单个任务,但我想在 dag 中创建多个任务并启动第一个任务。为了测试我正在使用的 dag 中的一项任务

task1.run()

正在执行。但是,当我在 dag 的下游一个接一个地执行许多任务时,这种方法就不起作用了。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.

为了运行第二个任务,我必须使用 t2.run() 来运行,我在设计 DAG 时不想要它。如何实现?

最佳答案

我不确定我是否理解您的问题,但我会尝试着开始回答。

如果您的目标只是手动运行 DAG 或其任务的子集,您可以从 CLI 实现,例如:

  • $ airflow run ... - 运行任务实例
  • $ airflow test ... - 在不检查依赖关系或在数据库中记录状态的情况下测试任务实例
  • $ airflow trigger_dag ... - 触发 DAG 的特定 DAG 运行

CLI 文档 - https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html

我认为 airflow run 命令与您的用例最相关。

在运行时,DAG 中的任务调度和下游依赖项的运行一旦它们的要求得到满足,都由执行器自动处理。您不需要在代码中的任何位置调用 run()。

就 run 方法本身而言,代码仍然存在:

问题

  1. 当您说“在测试环境中”测试 DAG 时,您到底是什么意思?比如 CI 或单元测试?
  2. 此代码是用于测试还是来自您的实际 DAG 之一的代码?
  3. 这与您最近提出的其他问题有关吗 Test Dag run for Airflow 1.9 in unittest

关于python - 如何在单元测试中测试 Airflow dag?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50003048/

53 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com