gpt4 book ai didi

airflow - 在 Airflow HttpSensor 运算符中使用 python 字符串替换和 xcom_pull

转载 作者:行者123 更新时间:2023-12-04 00:26:34 26 4
gpt4 key购买 nike

我有一个用例,我在 for 循环中,需要动态填充 HttpSensor 任务中的字段

我尝试使用这种语法:

方法 1 失败:

s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
task_id=s,
http_conn_id='ss_api',
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t),
request_params={'X-Requested-By': 'abc_123'},
response_check=lambda response: True if "FINISHED" in response.text else False,
poke_interval=10,
soft_fail=True,
timeout=600,
dag=dag_subdag,
)

但它失败了,因为在这一行:
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids={})}}/status?rev=0".format(t)

我无法使用 .format(t) 进行 python 字符串替换。

相反,如果我对某些值进行硬编码,则上述代码可以正常工作……例如,下面的代码可以正常工作:

方法 2 成功:
s = 'sensor_task_sd_{0}'.format(d)
sensor_task_sd = HttpSensor(
task_id=s,
http_conn_id='ss_api',
endpoint="/rest/v1/pipeline/{{ti.xcom_pull(key='curr_ss_pipe_id', task_ids='start_pipeline_sd_campaignhistory')}}/status?rev=0",
request_params={'X-Requested-By': 'abc_123'},
response_check=lambda response: True if "FINISHED" in response.text else False,
poke_interval=10,
soft_fail=True,
timeout=600,
dag=dag_subdag)

我只是不明白这一点..... 我已经尝试了各种技巧组合来使它工作,它只是不采用我用来保持代码动态的字符串插值。

所以我的问题很简单:

如何使 HttpSensor 操作符动态化?我不想在端点字符串中硬编码我的函数值(方法 2 样式),我想使用在运行时设置的值(方法 1 样式)。

我怎样才能使方法 1 工作?

最佳答案

所以 Airflow 使用 Jinja 来模板化它的字符串,当你混合 Jinja 模板和 Python 格式时,你需要“转义”Jinja 需要的大括号,这样 Python 格式就不会消耗它们。您可以通过将不属于 .format() 的每个大括号加倍来实现。称呼。
这应该会给你你需要的结果。

endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={}) }}}}/status?rev=0".format(t)
顺便说一句,根据我使用 f-strings (Python 3.6+) 或命名格式参数的经验,如果您能够在 Airflow 脚本中混合两者时,确实可以帮助代码清晰。但是您仍然需要“转义”大括号。
f 字符串:
endpoint = f"/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={t})}} }}/status?rev=0"
命名格式参数:
endpoint = "/rest/v1/pipeline/{{{{ ti.xcom_pull(key='curr_ss_pipe_id', task_ids={task_id}) }}}}/status?rev=0".format(task_id=t)
希望有帮助:)

关于airflow - 在 Airflow HttpSensor 运算符中使用 python 字符串替换和 xcom_pull,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56230150/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com