- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我设置了 airflow 并使用以下 vscode 调试配置运行 DAG:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
"LC_ALL": "en_US.UTF-8",
"LANG": "en_US.UTF-8"
}
}
]
}
它运行文件,我的断点 DAG defs 按预期中断,然后在文件末尾:它执行 dag.run()
然后我永远等待 dag 回填,并且我在任务的 python_callable 函数中的断点永远不会中断。
我没有看到什么 Airflow secret ?
这是我的爸爸:
# scheduled to run every minute, poke for a new file every ten seconds
dag = DAG(
dag_id='download-from-s3',
start_date=days_ago(2),
catchup=False,
schedule_interval='*/1 * * * *',
is_paused_upon_creation=False
)
def new_file_detection(**context):
print("File found...") # a breakpoint here never lands
pprint(context)
init = BashOperator(
task_id='init',
bash_command='echo "My DAG initiated at $(date)"',
dag=dag,
)
file_sensor = S3KeySensor(
task_id='file_sensor',
poke_interval=10, # every 10 seconds
timeout=60,
bucket_key="s3://inbox/new/*",
bucket_name=None,
wildcard_match=True,
soft_fail=True,
dag=dag
)
file_found_message = PythonOperator(
task_id='file_found_message',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)
init >> file_sensor >> file_found_message
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run() #this triggers a backfill job
最佳答案
这对我来说正如预期的那样有效。我可以在 DAG 级别或在 python callables 定义内设置断点,然后使用 VSCode 调试器遍历它们。
我正在使用您提供的相同调试设置,但我在 dag.clear 期间将参数
调用,如 reset_dag_runs=True
更改为 dag_run_state=State.NONE
()DebugExecutor
中指定的那样 docs page .我相信这在其中一个最新版本中已经发生了变化。
关于 backfills,我在 DAG 参数上设置了 catchup=False
(它双向工作)。重要说明,我正在运行 Airflow 2.0.0 版。
这是一个使用默认安装附带的 example_xcomp.py
中的相同代码的示例:
调试设置:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "internalConsole",
"justMyCode": false,
"env":{
"AIRFLOW__CORE__EXECUTOR": "DebugExecutor",
"AIRFLOW__DEBUG__FAIL_FAST": "True",
}
}
]
}
DAG 示例:
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
'excom_xample',
schedule_interval="@once",
start_date=days_ago(2),
default_args={'owner': 'airflow'},
tags=['example'],
catchup=False
)
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
def push(**kwargs):
"""Pushes an XCom without a specific target"""
logging.info("log before PUSH") # <<<<<<<<<<< Before landing on breakpoint
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and
check if the pushed values match the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
print("PRINT Line after breakpoint ") # <<<< After landing on breakpoint
if pulled_value_1 != value_1:
raise ValueError("The two values differ"
f"{pulled_value_1} and {value_1}")
# get value_2
pulled_value_2 = ti.xcom_pull(task_ids='push_by_returning')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(
key=None, task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(
f'The two values differ {pulled_value_1} and {value_1}')
if pulled_value_2 != value_2:
raise ValueError(
f'The two values differ {pulled_value_2} and {value_2}')
push1 = PythonOperator(
task_id='push',
dag=dag,
python_callable=push,
)
push2 = PythonOperator(
task_id='push_by_returning',
dag=dag,
python_callable=push_by_returning,
)
pull = PythonOperator(
task_id='puller',
dag=dag,
python_callable=puller,
)
pull << [push1, push2]
if __name__ == '__main__':
from airflow.utils.state import State
dag.clear(dag_run_state=State.NONE)
dag.run()
关于 Airflow 调试 : How to skip backfill job execution when running DAG in vscode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65335701/
我按如下方式创建我的 Airflow DAG: dag = DAG(...) 但在多个教程和类(class)中,我看到他们像这样使用 with ... as 子句: with DAG(...) as
我对 DAG、Airflow 和 Python 语法有点陌生(我从 Java 学习编码),但我有一个 DAG,其中包含大约 10 个相互独立的任务,而我有另一个 DAG,只有在所有 10 个任务都运行
下面是 Airflow DAG 代码。当 Airflow 在本地托管和在云 Composer 上托管时,它都能完美运行。但是,DAG 本身在 Composer UI 中不可单击。我发现了一个类似的问题
我有兴趣在使用 https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-when-t
我有一个 DAG(有向无环图),其顶点具有黑色或白色两种颜色中的任何一种。我需要将尽可能多的黑色顶点与图形应保持非循环的约束合并在一起。因此最终的 DAG 应该有最小值。的黑色顶点。这个问题的最佳算法
我正在尝试根据用户输入在 Airflow 中生成动态工作流。我知道可以根据文件和数据库中的数据选择它,但在所有这些情况下,工作流不会直接依赖于用户输入,如果多个用户使用相同的 dag,那么在这种情况下
我正在尝试拥有一个主 dag,它将根据我的需要创建更多 dags。我在 airflow.cfg 的 dags_folder 中有以下 python 文件。此代码在数据库中创建主 dag。该主 dag
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
我根据教程在 dags 文件夹中放置了一个 dag 文件,稍作修改,但它没有显示在 GUI 中或运行 airflow dags list 时。 最佳答案 回答我自己的问题:通过直接运行来检查 pyth
有调用主 dag 中不同 dags 的任务列表。我正在使用 TriggerDagrunoperator 来完成此操作。但面临一些问题。 TriggerDagrunoperator 不会等待外部 dag
我设置了 Airflow 并运行一些 DAG,计划每天一次“0 0 * * *”。 我想检查下一次安排运行特定 dag 的时间,但我看不到我可以在管理员中的什么地方执行此操作。 最佳答案 如果你想使用
我通过包管理器在我的计算机上安装了 llc 程序(当然我已经安装了 LLVM,6.0.0 版本)。另外,我从源代码构建了它。我想要的是查看由 llvm 生成的 DAG。但是,不幸的是,我在 llc-d
我在 spark 中有一个操作,应该对数据框中的几列执行。通常,有 2 种可能性来指定此类操作 硬编码 handleBias("bar", df) .join(handleBias("baz",
Airflow 似乎跳过了我添加到/usr/local/airflow/dags 的 dags。 当我跑 airflow list_dags 输出显示 [2017-08-06 17:03:47,220
非常喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到错误:“{jobs.py:538} 错误 - DAG 运行因 DAG 陷入僵局:TEST_SCHEDULER_DAG”。 这
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我想查找特定执行日期的特定 dag 的所有 dag 运行。 当我阅读文档时,有这个功能:dag_runs = DagRun.find(dag_id=self.dag_name, execution_d
我有一个 python DAG Parent Job和 DAG Child Job . Child Job中的任务应该在成功完成 Parent Job 时触发每天运行的任务。如何添加外部作业触发器?
我有一个由 TriggerDagRunOperator 触发的 DAG。它似乎运行良好,除非我尝试从 Airflow GUI 中“标记失败”或“标记成功”。当我这样做时,它总是尝试将更改应用到所有以前
Airflow 正在将所有 dags 加载到数据库中,但不会触发它们。 日志文件显示以下错误 [2020-01-05 02:55:06,226] {{dagbag.py:436}} [2020-0
我是一名优秀的程序员,十分优秀!