- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Airflow 2.0.0,我的任务在运行几秒钟或几分钟后偶尔会被“外部”杀死。这些任务通常会成功运行(对于通过 airflow tasks test ...
启动的手动任务和计划的 DAG 运行),所以我相信这与我的 DAG 代码无关。
当任务失败时,这似乎是任务日志中的关键错误:
{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:11,448] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,473] {taskinstance.py:1018} INFO - Starting attempt 3 of 3
[2020-12-20 11:26:11,473] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,506] {taskinstance.py:1038} INFO - Executing <Task(PythonOperator): run_backupper> on 2020-12-19T02:00:00+00:00
[2020-12-20 11:26:11,509] {standard_task_runner.py:51} INFO - Started process 12059 to run task
[2020-12-20 11:26:11,515] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--job-id', '22', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/backupper/daily_backups.py', '--cfg-path', '/tmp/tmpnfmqtorg']
[2020-12-20 11:26:11,517] {standard_task_runner.py:76} INFO - Job 22: Subtask run_backupper
[2020-12-20 11:26:11,609] {logging_mixin.py:103} INFO - Running <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [running]> on host localhost
[2020-12-20 11:26:11,742] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=<user>
AIRFLOW_CTX_DAG_ID=daily_backups
AIRFLOW_CTX_TASK_ID=run_backupper
AIRFLOW_CTX_EXECUTION_DATE=2020-12-19T02:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-12-19T02:00:00+00:00
...
... my job's logs, indicating that the job is running healthily ...
...
[2020-12-20 11:26:16,587] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:16,593] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 12059
[2020-12-20 11:27:16,609] {process_utils.py:108} WARNING - process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='sleeping', started='11:26:11') did not respond to SIGTERM. Trying SIGKILL
[2020-12-20 11:27:16,618] {process_utils.py:61} INFO - Process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='11:26:11') (12059) terminated with exit code Negsignal.SIGKILL
[2020-12-20 11:27:16,618] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGKILL
日志中的最后几行不一致。这是一个不同的版本,用于在早期尝试中失败的同一任务:
... same stuff as before ...
[2020-12-20 02:01:12,689] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 02:01:12,695] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 24442
[2020-12-20 02:02:00,462] {taskinstance.py:1214} ERROR - Received SIGTERM. Terminating subprocesses.
[2020-12-20 02:02:00,498] {process_utils.py:61} INFO - Process psutil.Process(pid=24442, status='terminated', exitcode=0, started='02:00:10') (24442) terminated with exit code 0
[2020-12-20 02:02:00,499] {local_task_job.py:118} INFO - Task exited with return code 0
我怀疑在这种情况下,脚本能够及时响应 SIGTERM,而在前一种情况下,它在长时间运行的查询中被阻止并且无法干净地终止。
最佳答案
我相信问题在于 调度程序健康检查阈值设置为小于调度程序心跳间隔。
在我的配置中,我设置了 scheduler_health_check_threshold
到 30 秒和 scheduler_heartbeat_sec
到 60 秒。在检查孤立任务(本身由不同的参数 orphaned_tasks_check_interval
控制)期间,调度程序心跳被确定为超过 30 秒,这是有道理的,因为它每 60 秒才心跳一次。因此,调度程序被推断为不健康并因此被终止。
大约在失败的时候,我可以在 /var/log/syslog
中看到这样的消息
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,368] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,373] {scheduler_job.py:1764} INFO - Marked 1 SchedulerJob instances as failed
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,381] {scheduler_job.py:1805} INFO - Reset the following 1 orphaned TaskInstances:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [running]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,571] {scheduler_job.py:938} INFO - 1 tasks up for execution:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [scheduled]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,574] {scheduler_job.py:972} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,575] {scheduler_job.py:999} INFO - DAG daily_backups has 0/16 running and queued tasks
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,575] {scheduler_job.py:1060} INFO - Setting the following tasks to queued state:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [scheduled]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,578] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='daily_backups', task_id='run_backupper', execution_date=datetime.datetime(2020, 12, 19, 2, 0, tzinfo=Timezone('UTC')), try_number=4) to executor with priority 2 and queue default
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,578] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/storage/airflow/dags/backupper/daily_backups.py']
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,581] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/storage/airflow/dags/backupper/daily_backups.py']
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,707] {dagbag.py:440} INFO - Filling up the DagBag from /storage/airflow/dags/backupper/daily_backups.py
Dec 20 11:26:15 localhost bash[11545]: Running <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]> on host localhost
并且时间戳与我的任务收到的 SIGTERM 非常吻合。我猜因为 SchedulerJob 被标记为失败,那么运行我的实际任务的 TaskInstance 被认为是孤立的,因此被标记为终止。同时它安排了新的尝试(
try_number=4
)。
scheduler_health_check_threshold
到 120 秒并重新启动调度程序/网络服务器服务似乎解决了我的问题。
关于airflow - 为什么我的 Airflow 任务是 "externally set to failed"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65380492/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!