- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我有一个任务的测试 dag,这是简单的 ETL 尝试从 mssql db 中提取数据并将它们加载到 postgres db。所以它的工作方式是按日期选择并在过去 360 天内插入到 postgres 数据库。但是在 10 天左右之后,该任务在 select 语句上超时。
def get_receiveCars(**kwargs):
#get current date
end_date = datetime.now()
#loop for 360 days
for x in range(360):
startDate = today - timedelta(days=x)
delete_dataPostgres(startDate.strftime('%Y-%m-%d'), "received sample")
select_dataMsql(startDate)
选择语句是:
def select_dataMsql(startDate):
#insert data
endDate = str(startDate.strftime('%Y-%m-%d')) + " 23:59:59"
ms_hook = MsSqlHook(mssql_conn_id='mssql_db')
select_sql="""select carColor, carBrand, fuelType, COUNT(DISTINCT RequestID ) AS received
FROM Requests
where
ReceivedDateTime >= %s
AND ReceivedDateTime< %s
GROUP BY carColor, carBrand, fuelType"""
cond = (startDate, endDate)
results =ms_hook.get_records(select_sql, parameters=cond)
insert_data(results, startDate)
这是我的狗
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from src.get_receiveCars import get_receiveCars
#from src.transform_data import transform_data
#from src.load_table import load_table
import requests
import json
import os
# Define the default dag arguments.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': XXXXX,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1)
}
# Define the dag, the start date and how frequently it runs.
# I chose the dag to run everday by using 1440 minutes.
dag = DAG(
dag_id='reveive_sample',
default_args=default_args,
dagrun_timeout=timedelta(minutes=200),
schedule_interval= '@daily',
start_date=datetime(2020, 10, 30))
# First task is to query get the weather from openweathermap.org.
mid_task = PythonOperator(
task_id='get_receiveCars',
provide_context=True,
python_callable=get_receiveCars,
dag=dag)
# Set task1
mid_task
日志
- Start syncing user roles.
[2020-10-30 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 84214
[2020-10-30 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/receive_sample.py
Traceback (most recent call last):
File "/root/airflow/lib/python3.6/site-packages/airflow/models/dagbag.py", line 256, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/root/airflow/dags/receive_sample.py", line 5, in <module>
from src.get_receiveCars import get_receiveCars
File "/root/airflow/dags/src/get_receiveCars.py", line 56, in <module>
get_receiveCars()
File "/root/airflow/dags/src/get_receiveCars.py", line 17, in get_receiveCars
delete_data(startDate.strftime('%Y-%m-%d'), "received cars")
File "/root/airflow/dags/src/get_receiveCars.py", line 26, in delete_data
pg_hook.run(delete_sql, parameters=cond)
File "/root/airflow/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py", line 172, in run
cur.execute(s, parameters)
File "/usr/local/lib/python3.6/encodings/utf_8.py", line 15, in decode
def decode(input, errors='strict'):
File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 84214
[2020-10-30 18:29:40,260] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,350] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,494] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,550] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,639] {security.py:387} INFO - Fetching a set of all per
最佳答案
检查您的配置:
airflow config list|grep -i timeout
dagbag_import_timeout = 30
dag_file_processor_timeout = 50
web_server_master_timeout = 120
web_server_worker_timeout = 120
log_fetch_timeout_sec = 5
smtp_timeout = 30
operation_timeout = 1.0
task_adoption_timeout = 600
您需要更改 dagbag_import_timeout 设置,以便有时间加载您的 dag。
export AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=300
关于airflow - Apache Airflow 任务超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64611734/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!