gpt4 book ai didi

python - 如何从谷歌云 Composer 调用云功能?

转载 作者:行者123 更新时间:2023-12-05 01:27:53 28 4
gpt4 key购买 nike

对于一个要求,我想从云 Composer 管道内部调用/调用一个云函数,但我找不到太多关于它的信息,我尝试使用 SimpleHTTP Airflow 运算符,但我得到了这个错误:

[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 357, in loads
return _default_decoder.decode(s)
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/python3.8/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None

提前致谢!

最佳答案

我遇到了与您相同的问题,但我通过研究 Google 的 Airflow 2.0 提供程序包并改为使用 PythonOperator 设法解决了这个问题。

from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
import google.auth.transport.requests
from google.auth.transport.requests import AuthorizedSession

def invoke_cloud_function():

url = "<your_cloud_function_url>" #the url is also the target audience.
request = google.auth.transport.requests.Request() #this is a request for obtaining the the credentials
id_token_credentials = id_token_credential_utils.get_default_id_token_credentials(url, request=request) # If your cloud function url has query parameters, remove them before passing to the audience

resp = AuthorizedSession(id_token_credentials).request("GET", url=url) # the authorized session object is used to access the Cloud Function

print(resp.status_code) # should return 200
print(resp.content) # the body of the HTTP response

因此,调用函数如下:

    task = PythonOperator(task_id="invoke_cf", python_callable=invoke_cloud_function)

据我了解,访问经过身份验证的 HTTP Cloud Function 严格需要基于 ID token 的凭据。因此,为了获得所需的类型凭据,get_default_id_token_credentials() 执行应用程序默认凭据 (ADC) 授权流程,这是一个从环境变量、已知位置获取凭据的过程.或 Compute Engine 元数据服务器。 Composer 应该通过环境变量(可能是 GOOGLE_APPLICATION_CREDENTIALS)提供相关的服务帐户 key 文件。

一旦您拥有正确类型的凭据,您就可以使用 AuthorizedSessions 对象来验证您对云函数的请求。

关于python - 如何从谷歌云 Composer 调用云功能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69131840/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com