gpt4 book ai didi

rest - 如何使用 Airflow DAG 调用 REST 端点

转载 作者:行者123 更新时间:2023-12-02 18:03:45 25 4
gpt4 key购买 nike

我是 Apache Airflow 的新手。我想使用 DAG 调用 REST 端点。
例如 REST 端点

@PostMapping(path = "/api/employees", consumes = "application/json")

现在我想使用 Airflow DAG 调用这个休息端点,并安排它。我正在做的是使用 SimpleHttpOperator 来调用 Rest 端点。
t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

当我触发 DAG 时,任务失败
[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator): 
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run',
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool',
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path',
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12-
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483]
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/,
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484]
{{http_hook.py:131}} INFO - Sending 'POST' to url:
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501]
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake:
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR -
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url:
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1,
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)

Airflow 在 Docker 上运行,docker 镜像是 puckel/docker-airflow .
为什么会调用主机 http_default。主持人:https://www.google.com/

最佳答案

您需要同时考虑 Operator您正在使用和底层 Hook它用来连接。HookAirflow Connection 获取连接信息这只是一个用于存储凭据和其他连接信息的容器。您可以配置Connections在 Airflow UI 中(使用 Airflow UI -> Admin -> Connections)。

所以在这种情况下,你需要先配置你的 HTTP Connection .

来自 http_hook documentation :

http_conn_id (str) – connection that has the base API url i.e https://www.google.com/

碰巧对于 httpHook ,你应该配置 Connection通过设置 host参数等于 base_url您的端点: http://localhost:8084/ .

由于您的运营商具有默认 http_conn_id ,钩子(Hook)将使用 Airflow Connection在 Airflow UI 中称为“http_default”。
如果您不想更改默认设置,您可以创建另一个 Airflow Connection使用 Airflow UI,并将新的 conn_id 参数传递给您的运算符(operator)。

source code为了更好地了解 Connection使用对象。

最后,根据 http_operator documentation :
endpoint (str) – The relative part of the full url. (templated)

您应该只将 URL 的相对部分传递给运算符(operator)。其余部分将从基础 http_hook 中获得.

在这种情况下, endpoint 的值为您的 Operator应该是 api/employees (不是完整的 URL)。

不幸的是,Airflow 项目文档在这种情况下不是很清楚。请考虑做出改进,他们总是受欢迎的:)

关于rest - 如何使用 Airflow DAG 调用 REST 端点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59574331/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com