gpt4 book ai didi

Airflow 调试 : How to skip backfill job execution when running DAG in vscode

转载 作者:行者123 更新时间:2023-12-04 03:44:53 27 4
gpt4 key购买 nike

我设置了 airflow 并使用以下 vscode 调试配置运行 DAG:

{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
"LC_ALL": "en_US.UTF-8",
"LANG": "en_US.UTF-8"
}
}
]
}

它运行文件,我的断点 DAG defs 按预期中断,然后在文件末尾:它执行 dag.run() 然后我永远等待 dag 回填,并且我在任务的 python_callable 函数中的断点永远不会中断。

我没有看到什么 Airflow secret ?

这是我的爸爸:

# scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
dag_id='download-from-s3',
start_date=days_ago(2),
catchup=False,
schedule_interval='*/1 * * * *',
is_paused_upon_creation=False
)

def new_file_detection(**context):
print("File found...") # a breakpoint here never lands
pprint(context)

init = BashOperator(
task_id='init',
bash_command='echo "My DAG initiated at $(date)"',
dag=dag,
)

file_sensor = S3KeySensor(
task_id='file_sensor',
poke_interval=10, # every 10 seconds
timeout=60,
bucket_key="s3://inbox/new/*",
bucket_name=None,
wildcard_match=True,
soft_fail=True,
dag=dag
)

file_found_message = PythonOperator(
task_id='file_found_message',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)

init >> file_sensor >> file_found_message

if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run() #this triggers a backfill job

最佳答案

这对我来说正如预期的那样有效。我可以在 DAG 级别或在 python callables 定义内设置断点,然后使用 VSCode 调试器遍历它们。

我正在使用您提供的相同调试设置,但我在 dag.clear 期间将参数 reset_dag_runs=True 更改为 dag_run_state=State.NONE () 调用,如 DebugExecutor 中指定的那样 docs page .我相信这在其中一个最新版本中已经发生了变化。

关于 backfills,我在 DAG 参数上设置了 catchup=False(它双向工作)。重要说明,我正在运行 Airflow 2.0.0 版

这是一个使用默认安装附带的 example_xcomp.py 中的相同代码的示例:

调试设置:

{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "internalConsole",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
}
}
]
}

DAG 示例:

import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
'excom_xample',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)

value_1 = [1, 2, 3]
value_2 = {'a': 'b'}


def push(**kwargs):
"""Pushes an XCom without a specific target"""
logging.info("log before PUSH") # <<<<<<<<<<< Before landing on breakpoint
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)


def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2


def puller(**kwargs):
"""Pull all previously pushed XComs and
check if the pushed values match the pulled values."""
ti = kwargs['ti']

# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("PRINT Line after breakpoint ") # <<<< After landing on breakpoint
if pulled_value_1 != value_1:
raise ValueError("The two values differ"
f"{pulled_value_1} and {value_1}")

# get value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')

# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(
key=None, task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(
f'The two values differ {pulled_value_1} and {value_1}')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')


push1 = PythonOperator(
task_id='push',
dag=dag,
python_callable=push,
)

push2 = PythonOperator(
task_id='push_by_returning',
dag=dag,
python_callable=push_by_returning,
)

pull = PythonOperator(
task_id='puller',
dag=dag,
python_callable=puller,
)

pull << [push1, push2]

if __name__ == '__main__':
from airflow.utils.state import State
dag.clear(dag_run_state=State.NONE)
dag.run()

关于 Airflow 调试 : How to skip backfill job execution when running DAG in vscode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65335701/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com