gpt4 book ai didi

Airflow - 损坏的 DAG - 超时

转载 作者:行者123 更新时间:2023-12-02 16:49:16 24 4
gpt4 key购买 nike

我有一个 DAG,它执行一个连接到 Postgres DB 的函数,删除表中的内容,然后插入新的数据集。

我正在本地尝试此操作,当我尝试运行此操作时,我发现网络服务器需要很长时间才能连接,并且在大多数情况下不会成功。然而,作为连接过程的一部分,它似乎是从后端执行查询。由于我有一个删除函数,即使我没有安排脚本或手动启动,我也会看到数据从表中删除(基本上是执行其中一个函数)。有人可以建议我在这方面做错了什么吗?

用户界面中弹出的一个错误是

损坏的 DAG:[/Users/user/airflow/dags/dwh_sample23.py] 超时

还可以在 UI 中的 dag id 旁边看到一个 i,表示该 DAG 在网络服务器的 DAG 对象中不可用。下面给出的是我正在使用的代码:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = BashOperator(
task_id='DB_Connect',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)

t2 = BashOperator(
task_id='del',
python_callable=tbl1_del(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t3 = BashOperator(
task_id='populate',
python_callable=pop_tbl1(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)

有人可以帮忙吗?谢谢。

最佳答案

而不是使用 BashOperator您可以使用PythonOperator并调用db_login() , tbl1_del() , pop_tbl1()PythonOperator

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = PythonOperator(
task_id='DB_Connect',
python_callable=db_login(),
dag=dag)

t2 = PythonOperator(
task_id='del',
python_callable=tbl1_del(),
dag=dag)


t3 = PythonOperator(
task_id='populate',
python_callable=pop_tbl1(),
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)

关于 Airflow - 损坏的 DAG - 超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50505490/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com