gpt4 book ai didi

python - Apache Airflow 卡在执行最后一个任务的循环中(bash 运算符执行 python 脚本)

转载 作者:行者123 更新时间:2023-12-05 04:34:40 25 4
gpt4 key购买 nike

我在本地机器上的 docker 容器中运行 Airflow。我正在运行一个测试 DAG 来执行 3 个任务。这三个任务运行良好,但是,最后一个使用 bash 操作符的任务卡在了一个循环中,如底部图片所示。查看日志文件,只为第一次执行 bash python 脚本生成一个条目,然后什么都没有,但是 python 文件一直在执行。关于可能是什么问题的任何建议?

谢谢,

理查德

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def creating_dataframe(ti):
import pandas as pd
import os

loc = r'/opt/airflow/dags/'
filename = r'demo.csv'
df_location = loc + filename
ti.xcom_push(key='df_location', value=df_location)

if os.path.exists(loc + filename):
print("if exists")
return df_location

else:
df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
index = [pd.Timestamp.now()])
df.to_csv(loc + filename, sep=';')
print("does not exist")

return df_location


def adding_row_to_dataframe(ti):
import pandas as pd
fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]


df = pd.read_csv(fetched_location,index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])
df2.to_csv(fetched_location,sep=";")
print("second function")

with DAG(
dag_id="richards_airflow_demo",
schedule_interval="@once",
start_date=datetime(2022, 2, 17 ),
catchup=False,
tags=["this is a demo of airflow","adding row"],
) as dag:

task1 = PythonOperator(
task_id="creating_dataframe",
python_callable=creating_dataframe,
do_xcom_push=True
)


task2 = PythonOperator(
task_id='adding_row_to_dataframe',
python_callable=adding_row_to_dataframe


)

task3 = BashOperator(
task_id='python_bash_script',
bash_command=r"echo 'python /opt/scripts/test.py'"
)


task1 >> task2 >> task3

Bash python 脚本:

import pandas as pd

df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])

df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')

Example of issue Log file for bashoperator

最佳答案

好吧,没有研究为什么会这样,但似乎如果我在 dags 文件夹中创建一个脚本文件夹,即使 bashoperator 不存在,也会执行里面的 python 脚本 (test_dontputthescripthere.py) t 告诉它执行。如您所见,bashoperator 正在完美地执行 test.py 文件,并将以下行添加到 csv:

2022-02-21 15:11:53.923284;使用 bash python 脚本添加条目

test_dontputthescripthere.py 在循环中执行,没有 bashoperator 执行文件。这是 demo.csv 文件中的所有“- 这是错误的”条目。

我怀疑 Airflow 内部正在进行某种刷新,迫使它执行 python 文件。

Solution

关于python - Apache Airflow 卡在执行最后一个任务的循环中(bash 运算符执行 python 脚本),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71171886/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com