gpt4 book ai didi

airflow - 如何解析 Airflow 模板中的json字符串

转载 作者:行者123 更新时间:2023-12-03 18:31:00 57 4
gpt4 key购买 nike

是否可以在 Airflow 模板中解析 JSON 字符串?

我有一个 HttpSensor,它通过 REST API 监视作业,但作业 ID 位于具有 xcom_push 的上游任务的响应中。标记 True .

我想做类似以下的事情,但是,此代码给出了错误 jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
http_conn_id="s1",
task_id="job",
endpoint="some_url",
method='POST',
data=json.dumps({ "foo": "bar" }),
xcom_push=True,
dag=dag,
)

t2 = HttpSensor(
http_conn_id="s1",
task_id="finish_job",
endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
response_check=lambda response: True if response.json().state == "complete" else False,
poke_interval=5,
dag=dag
)

t2.set_upstream(t1)

最佳答案

您可以使用参数 user_defined_filters 向 DAG 添加自定义 Jinja 过滤器解析json。

a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.


dag = DAG(
...
user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
task_id='job',
xcom_push=True,
...
)

t2 = HttpSensor(
endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
...
)

但是,编写自己的自定义 JsonHttpOperator 可能更干净 plugin (或向 SimpleHttpOperator 添加一个标志)在 returning 之前解析 JSON这样你就可以直接引用 {{ti.xcom_pull("job")["jobId"]在模板中。
class JsonHttpOperator(SimpleHttpOperator):

def execute(self, context):
text = super(JsonHttpOperator, self).execute(context)
return json.loads(text)

关于airflow - 如何解析 Airflow 模板中的json字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47506225/

57 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com