gpt4 book ai didi

jinja2 - 在 Apache Airflow 的 S3KeySensor 中模板化 `bucket_key`

转载 作者:行者123 更新时间:2023-12-05 07:33:37 33 4
gpt4 key购买 nike

Airflow 版本:1.9.0

在 airflow dag 文件中,我有一个名为 run_queryPythonOperator 任务,它在其 python_callable 函数中设置了以下 xcom 变量:

kwargs['ti'].xcom_push(key='query_result_loc', value=query_result_loc)

在同一个 dag 中,我有一个 S3KeySensor 任务,它使用上面的位置作为它的 bucket_key 参数:

S3KeySensor(task_id = 'check_file_in_s3',
bucket_key = '{{ ti.xcom_pull(task_ids="run_query",key="query_result_loc") }}' ,
bucket_name = None,
wildcard_match = False,
poke_interval=60,
timeout=1200,
dag = dag
)

现在,当我运行 dag(在测试模式或 trigger_dag 模式下)时,S3KeySensor 提示缺少 bucket_name,它来自 S3KeySensor definition 中的这段代码:

    class S3KeySensor(BaseSensorOperator):
"""
Waits for a key (a file-like instance on S3) to be present in a S3 bucket.
S3 being a key/value it does not support folders. The path is just a key
a resource.

:param bucket_key: The key being waited on. Supports full s3:// style url
or relative path from root level.
:type bucket_key: str
:param bucket_name: Name of the S3 bucket
:type bucket_name: str
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
:type wildcard_match: bool
:param aws_conn_id: a reference to the s3 connection
:type aws_conn_id: str
"""
template_fields = ('bucket_key', 'bucket_name')

@apply_defaults
def __init__(
self, bucket_key,
bucket_name=None,
wildcard_match=False,
aws_conn_id='aws_default',
*args, **kwargs):
super(S3KeySensor, self).__init__(*args, **kwargs)
# Parse
if bucket_name is None:
parsed_url = urlparse(bucket_key)
if parsed_url.netloc == '':
raise AirflowException('Please provide a bucket_name')
else:
bucket_name = parsed_url.netloc
if parsed_url.path[0] == '/':
bucket_key = parsed_url.path[1:]
else:
bucket_key = parsed_url.path
self.bucket_name = bucket_name
self.bucket_key = bucket_key

看起来模板在这个阶段没有被渲染。

如果我注释掉 if block ,它就可以正常工作。这是错误还是模板字段的错误使用?

更新,基于@kaxil 的评论:

  • 在没有提供 bucket_name 且“if” block 未注释的情况下,airflow 甚至无法检测到 dag。在 UI 上,我看到了这个错误:Broken DAG: [/XXXX/YYYY/project_airflow.py] Please provide provide a bucket_name
  • 没有提供 bucket_name,但对“if” block 进行了以下修改(参见删除了 if parsed_url.netloc == '' 检查),它工作正常:

    if bucket_name is None:
    parsed_url = urlparse(bucket_key)
    bucket_name = parsed_url.netloc
    if parsed_url.path[0] == '/':
    bucket_key = parsed_url.path[1:]
    else:
    bucket_key = parsed_url.path
  • 在提供 bucket_name 的情况下,它可以很好地处理 Rendered Template 选项卡下的 bucket_keybucket_name 的呈现值。

最佳答案

您现在可能已经解决了这个问题,但这是因为您没有为 task_ids 提供数组。格式应为 task_ids=["run_query"]。将其更改为此解决了我的问题。

关于jinja2 - 在 Apache Airflow 的 S3KeySensor 中模板化 `bucket_key`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50514092/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com