gpt4 book ai didi

python - 如果我在任务中发送 http 请求,为什么我的 Airflow 会挂起?

转载 作者:行者123 更新时间:2023-12-03 07:56:20 28 4
gpt4 key购买 nike

系统: MacOS Apple M1(本地计算机)

Airflow :2.5.3

执行器:本地 Postgres 数据库

我正在尝试实现一些外部触发的工作流程,这些工作流程从我们的 REST API 加载数据。我正在使用 Python Operator 运行代码并使用 Airflow UI 手动触发该流程。但是,当执行到包含发送 http 请求的代码的任务时,它会永远挂起,并且笔记本电脑开始运行得非常热。

enter image description here

enter image description here

该任务在另一个文件中定义,我将其作为模块导入。以下是任务文件(tasks/import_logs.py)的内容

import requests

def import_logs(**context):
print("[Sasha] Running log importer")
context["ti"].xcom_push(
key="logs", value=["log/location/1", "log/location/2"])
print("Log locations pushed to xcom")
# Define the URL for the dummy endpoint
url = 'https://jsonplaceholder.typicode.com/posts'

# Define the payload for the JSON request
payload = {
"title": "foo",
"body": "bar",
"userId": 1
}

# Define the headers for the request
headers = {'Content-Type': 'application/json'}

# Send the POST request to the dummy endpoint
response = requests.post(url, json=payload, headers=headers)

# Print the response status code and content
print(f'Response status code: {response.status_code}')
print(f'Response content: {response.content}')

这是 Dag 定义:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from tasks.import_logs import import_logs
from tasks.import_tops import import_tops
from tasks.process_input import process_input
from tasks.process_log_data import process_logs
from tasks.output_logs import output_logs
from tasks.cleanup import cleanup
from tasks.trigger_data_update import trigger_data_update


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 31),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('process_log', default_args=default_args, schedule_interval=None)

validate_input = PythonOperator(
task_id='validate_input',
python_callable=process_input,
provide_context=True,
dag=dag
)

import_log = PythonOperator(
task_id='import_logs',
python_callable=import_logs,
provide_context=True,
dag=dag
)

import_top = PythonOperator(
task_id='import_tops',
python_callable=import_tops,
provide_context=True,
dag=dag
)

process_log = PythonOperator(
task_id='process_logs',
python_callable=process_logs,
provide_context=True,
dag=dag
)

output_log = PythonOperator(
task_id='write_logs',
python_callable=output_logs,
provide_context=True,
dag=dag
)

cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
provide_context=True,
dag=dag
)

update_task = PythonOperator(
task_id='trigger_data_update',
python_callable=trigger_data_update,
provide_context=True,
dag=dag
)

validate_input >> [import_top, import_log] >> process_log >> output_log >> [cleanup_task, update_task]

if __name__ == "__main__":
import json
with open('test_conf/process_log.json', 'r') as f:
conf = json.load(f)
dag.test(
run_conf=conf
)

它开始发生在顺序执行器上。从那时起,我已将实例移至本地执行程序,删除了所有额外的代码,只留下了一个简单的虚拟请求来尝试缩小错误范围,但它一直在发生。

我还尝试使用带有连接的 Airflow HTTP Hook,这导致了相同的行为。读到它似乎从任务发送请求不应该成为问题,所以我放弃了这种方法。

此时,所有其他任务也是虚拟的,只是打印内容。在 DAG 文件底部运行测试可以正常进行,没有任何问题。我正在考虑在 Debug模式下运行 Airflow 并开始调试,但这会浪费很多时间。

最佳答案

我也和你一样遇到http请求挂出问题。

我的本​​地环境是airflow2.5.1/python3.8.13/M1 macOS。

这似乎是 macOS python 包问题。

我通过添加如下所示的环境来修复它:

export NO_PROXY="*"

https://github.com/apache/airflow/discussions/24463

希望这可以帮助你。

关于python - 如果我在任务中发送 http 请求,为什么我的 Airflow 会挂起?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75980623/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com