gpt4 book ai didi

https - Airflow 中用于 HTTPS 的 HttpOperator 或 HttpHook

转载 作者:行者123 更新时间:2023-12-03 16:50:38 28 4
gpt4 key购买 nike

我正在对 Google Cloud 上的 Airflow 进行一些概念验证。

本质上,我想创建一个工作流,从 REST API (https) 下载数据,将此数据转换为 JSON 格式,然后将其上传到 Google Cloud 存储单元。

我已经用纯 Python 代码完成了这项工作,并且可以正常工作。很简单!但是因为我想安排这个并且有一些依赖关系,所以 Airflow 应该是这个的理想工具。

在仔细阅读 Airflow 文档后,我发现 HttpOperator 和/或 HttpHook 可以解决下载部分的问题。

我已经使用我的电子邮件/密码创建了与 WebUI 的 Http 连接,用于授权,如下所示:

{Conn ID:“atlassian_marketplace”,Conn 类型:“HTTP”,主机:“https://marketplace.atlassian.com/rest/2”,架构:无/空白,登录:“我的用户名”,密码:“我的密码”,端口:无/空白,额外:无/空白}

第一个问题:
- 何时使用 SimpleHttpOperator 与 HttpHook?

第二个问题:
- 我们如何在 HTTPs 调用中使用 SimpleHttpOperator 或 HttpHook?

第三个问题:
- 我们如何访问 API 调用返回的数据?

在我的例子中,XCOM 功能无法解决问题,因为这些 API 调用会返回大量数据(100-300mb)!

我在谷歌上找到了一个关于如何在我的用例中使用operator/hook的示例代码,但我还没有找到任何有用的东西。

有任何想法吗?

到目前为止,我把我的代码框架放在这里。

# Usual Airflow import

# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET")

# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?

# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop

最佳答案

看下面的例子:

# Usual Airflow import

# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
task_id="get_data",
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET",
xcom_push=True,
)

def transform_json(**kwargs):
ti = kwargs['ti']
pulled_value_1 = ti.xcom_pull(key=None, task_ids='get_data')
...
# transform the json here and save the content to a file


# Task 3: Save JSON data locally
save_and_transform = PythonOperator(
task_id="save_and_transform",
python_callable=transform_json,
provide_context=True,
)

# Task 4: Upload data to GCP
upload_to_gcs = FileToGoogleCloudStorageOperator(...)

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> save_and_transform >> upload_to_gcs >> stop

关于https - Airflow 中用于 HTTPS 的 HttpOperator 或 HttpHook,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58224068/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com