gpt4 book ai didi

airflow - 在单元测试中运行 Airflow 1.9 的测试 Dag

转载 作者:行者123 更新时间:2023-12-02 18:55:18 24 4
gpt4 key购买 nike

我已经实现了运行单个 dag 的测试用例,但它似乎在 1.9 中不起作用,可能是由于 Airflow 1.8 中引入了更严格的池所致。我正在尝试运行以下测试用例:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

class DAGTest(unittest.TestCase):
def make_tasks(self):
dag = DAG('test_dag', description='a test',
schedule_interval='@once',
start_date=datetime(2018, 6, 26),
catchup=False)


du1 = DummyOperator(task_id='dummy1', dag=dag)
du2 = DummyOperator(task_id='dummy2', dag=dag)
du3 = DummyOperator(task_id='dummy3', dag=dag)
du1 >> du2 >> du3
dag.run()

def test_execute(self):
self.make_tasks()

异常:

Dependencies not met for <TaskInstance: test_dag.dummy3 2018-06-26 00:00:00  [upstream_failed]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all
upstream tasks to have succeeded, but found 1 non-success(es).
upstream_tasks_state={'skipped': 0L, 'successes': 0L, 'failed': 0L,'upstream_failed': 1L, 'done': 1L, 'total': 1}, upstream_task_ids=['dummy2']

我做错了什么?我尝试过 LocalExecutor 和 SequentialExecutor

环境:

Python 2.7
Airflow 1.9

我相信它试图同时执行所有任务而不考虑依赖关系。注意:类似的代码可在 Airflow 1.7 中使用

最佳答案

我不熟悉 Airflow 1.7,但我猜它没有 Airflow1.8 及更高版本所具有的“DagBag”概念。

您无法运行这样创建的 DAG,因为 dag.run() 启动一个新的 python 进程,并且它必须从它解析的 dag 文件夹中找到 DAG磁盘 - 但它不能。这作为消息包含在输出中(但您没有包含完整的错误消息/输出)

您想通过在测试文件中创建 dag 来测试什么?是自定义运算符吗?那么你最好直接测试它。例如,以下是我如何独立测试自定义运算符:

class MyPluginTest(unittest.TestCase)
def setUp(self):
dag = DAG(TEST_DAG_ID, schedule_interval='* * * * Thu', default_args={'start_date': DEFAULT_DATE})
self.dag = dag
self.op = myplugin.FindTriggerFileForExecutionPeriod(
dag=dag,
task_id='test',
prefix='s3://bucket/some/prefix',
)
self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)

# Other S3 setup here, specific to my test


def test_execute_no_trigger(self):
with self.assertRaises(RuntimeError):
self.ti.run(ignore_ti_state=True)

# It shouldn't have anything in XCom
self.assertEqual(
self.ti.xcom_pull(task_ids=self.op.task_id),
None
)

关于airflow - 在单元测试中运行 Airflow 1.9 的测试 Dag,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51047050/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com