gpt4 book ai didi

Airflow - 如何处理异步 API 调用?

转载 作者:行者123 更新时间:2023-12-04 07:30:45 53 4
gpt4 key购买 nike

我正在尝试弄清楚如何最好地解决以下问题。本质上,我有一个外部 API 服务,我向其发送请求并获取结果。

POST = 发送请求,您得到的响应是一个 URL,您可以将其用于 GET 请求以检索结果。

GET = 轮询从 POST 请求返回的 URL,直到获得成功的结果。

在 Airflow 中解决这个问题的最佳方法是什么?我的想法是让 2 个任务并行运行。

  1. 发送 POST 请求,然后将响应 URL 保存到 XCOM。
  2. 另一个将在 while 循环中持续运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从该 URL 检索到成功结果,它就会从 XCOM 存储中删除。

您认为这是正确的做法吗?或者我应该在 python 中使用 asyncio 库吗?

非常感谢任何帮助

谢谢,

最佳答案

您可以使用 Airflow 中的 SimpleHttpOperatorHttpSensor 实现您所描述的内容(无需安装任何额外的包)。

考虑这个使用 http_default 连接到 http bin 的例子.

执行POST请求的任务:

task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)

通过提供 response_filter,您可以操纵响应结果,这将是推送到 XCom 的值。在您的情况下,您应该在下一个任务中返回要轮询的端点。

response_filter: A function allowing you to manipulate the responsetext. e.g response_filter=lambda response: json.loads(response.text).The callable takes the response object as the first positional argumentand optionally any number of keyword arguments available in the context dictionary.:type response_filter: A lambda or defined function.

请注意 response_check参数是可选的。

执行 GET 请求的任务:

使用 HttpSensor戳直到 response_check 可调用计算结果为真。

task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)

作为 endpoint 参数,我们使用 XComArg 传递从上一个任务中提取的 XCom 值。 .使用 poke_interval 定义作业在每次尝试之间应等待的时间(以秒为单位)。

记得创建一个Connection您自己定义基本 URL、端口等。

让我知道这是否对您有用!

关于Airflow - 如何处理异步 API 调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67946819/

53 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com