gpt4 book ai didi

airflow - Apache Airflow 任务超时

转载 作者:行者123 更新时间:2023-12-04 17:23:54 25 4
gpt4 key购买 nike

所以我有一个任务的测试 dag,这是简单的 ETL 尝试从 mssql db 中提取数据并将它们加载到 postgres db。所以它的工作方式是按日期选择并在过去 360 天内插入到 postgres 数据库。但是在 10 天左右之后,该任务在 select 语句上超时。

def get_receiveCars(**kwargs):
#get current date
end_date = datetime.now()
#loop for 360 days
for x in range(360):
startDate = today - timedelta(days=x)
delete_dataPostgres(startDate.strftime('%Y-%m-%d'), "received sample")
select_dataMsql(startDate)

选择语句是:
def select_dataMsql(startDate):
#insert data
endDate = str(startDate.strftime('%Y-%m-%d')) + " 23:59:59"
ms_hook = MsSqlHook(mssql_conn_id='mssql_db')
select_sql="""select carColor, carBrand, fuelType, COUNT(DISTINCT RequestID ) AS received
FROM Requests
where
ReceivedDateTime >= %s
AND ReceivedDateTime< %s
GROUP BY carColor, carBrand, fuelType"""
cond = (startDate, endDate)
results =ms_hook.get_records(select_sql, parameters=cond)
insert_data(results, startDate)
这是我的狗
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from src.get_receiveCars import get_receiveCars
#from src.transform_data import transform_data
#from src.load_table import load_table
import requests
import json
import os


# Define the default dag arguments.
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': XXXXX,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1)
}


# Define the dag, the start date and how frequently it runs.
# I chose the dag to run everday by using 1440 minutes.
dag = DAG(
dag_id='reveive_sample',
default_args=default_args,
dagrun_timeout=timedelta(minutes=200),
schedule_interval= '@daily',
start_date=datetime(2020, 10, 30))


# First task is to query get the weather from openweathermap.org.
mid_task = PythonOperator(
task_id='get_receiveCars',
provide_context=True,
python_callable=get_receiveCars,
dag=dag)

# Set task1
mid_task
日志
- Start syncing user roles.
[2020-10-30 18:29:40,238] {timeout.py:42} ERROR - Process timed out, PID: 84214
[2020-10-30 18:29:40,238] {dagbag.py:259} ERROR - Failed to import: /root/airflow/dags/receive_sample.py
Traceback (most recent call last):
File "/root/airflow/lib/python3.6/site-packages/airflow/models/dagbag.py", line 256, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/root/airflow/dags/receive_sample.py", line 5, in <module>
from src.get_receiveCars import get_receiveCars
File "/root/airflow/dags/src/get_receiveCars.py", line 56, in <module>
get_receiveCars()
File "/root/airflow/dags/src/get_receiveCars.py", line 17, in get_receiveCars
delete_data(startDate.strftime('%Y-%m-%d'), "received cars")
File "/root/airflow/dags/src/get_receiveCars.py", line 26, in delete_data
pg_hook.run(delete_sql, parameters=cond)
File "/root/airflow/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py", line 172, in run
cur.execute(s, parameters)
File "/usr/local/lib/python3.6/encodings/utf_8.py", line 15, in decode
def decode(input, errors='strict'):
File "/root/airflow/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 84214
[2020-10-30 18:29:40,260] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,350] {security.py:477} INFO - Start syncing user roles.
[2020-10-30 18:29:40,494] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,550] {security.py:387} INFO - Fetching a set of all permission, view_menu from FAB meta-table
[2020-10-30 18:29:40,639] {security.py:387} INFO - Fetching a set of all per

最佳答案

检查您的配置:

airflow config list|grep -i timeout
dagbag_import_timeout = 30
dag_file_processor_timeout = 50
web_server_master_timeout = 120
web_server_worker_timeout = 120
log_fetch_timeout_sec = 5
smtp_timeout = 30
operation_timeout = 1.0
task_adoption_timeout = 600
您需要更改 dagbag_import_timeout 设置,以便有时间加载您的 dag。
为此,请更新您的 airflow.cfg 文件或设置环境变量:
export AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=300

关于airflow - Apache Airflow 任务超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64611734/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com