gpt4 book ai didi

python - Airflow/Composer - 在 zip 打包的 DAG 中找不到模板

转载 作者:行者123 更新时间:2023-12-05 07:32:53 24 4
gpt4 key购买 nike

我无法让模板化的 SQL 文件在 Composer 中工作。我认为问题与我将 DAG 打包为 zip 文件以包含其他代码有关。

我从这个开始(只显示相关部分):

dag = DAG('my_dag',
default_args=default_args,
schedule_interval=schedule_interval)

task0 = BigQueryOperator(
task_id='task0',
use_legacy_sql=False,
bql='sql/query_file.sql',
bigquery_conn_id=bigquery_conn_id,
dag=dag)

文件结构如下:

/dags/my_dag_file.py
/dags/sql/query_file.sql
/dags/my_pkg/
/dags/my_pkg/__init__.py
/dags/my_pkg/extra_module.py

我正在像这样压缩它并将其复制到 Composer dags 文件夹:

zip -r my_zip_file.zip *.py my_pkg/ sql/

这在本地有效,但在 Composer 上部署时出现错误:

TemplateNotFound: sql/query_file.sql

我确信我在 zip 中包含了 SQL 文件。我也尝试将它移动到根文件夹(没有 sql/子目录),但我得到了相同的结果。

我在某处读到在实例化 DAG 对象时需要设置 template_searchpath。我没能成功做到这一点。当我尝试相对路径 (sql) 时,我收到更多 TemplateNotFound 错误。当我尝试如下所示的绝对路径时,我得到的是 not a directory

这是我尝试过的:

dag = DAG('my_dag',
default_args=default_args,
schedule_interval=schedule_interval,
template_searchpath = os.path.dirname(__file__) + "/sql"
)

task0 = BigQueryOperator(
task_id='task0',
use_legacy_sql=False,
bql='query_file.sql',
bigquery_conn_id=bigquery_conn_id,
dag=dag)

我还尝试将“sql”作为任务路径的一部分,而不是模板搜索路径,我再次尝试将所有内容移动到根级别,但得到了同样的“不是目录”错误。

据我所知,问题与文件包含在 zip 文件中有关。 __file__ 返回 /home/airflow/gcs/dags/my_zip_file.zip/my_dag_file.py。但是 os.listdir(os.path.dirname(__file__)) 抛出相同的 not a directory 错误。所以也许因为我们在 zip 存档中执行,我们不能以相同的方式使用文件夹和路径。也许 Jinja 被这个绊倒了……?或者在打包 zip 文件时可能还有其他事情要做?

[2018-06-20 15:35:34,837] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-20 15:35:34,838] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-06-20 15:35:34,840] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-06-20 15:35:34,841] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-06-20 15:35:34,841] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
[2018-06-20 15:35:34,842] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-06-20 15:35:34,843] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
[2018-06-20 15:35:34,843] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1477, in _run_raw_task
[2018-06-20 15:35:34,844] {base_task_runner.py:98} INFO - Subtask: self.render_templates()
[2018-06-20 15:35:34,844] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1760, in render_templates
[2018-06-20 15:35:34,845] {base_task_runner.py:98} INFO - Subtask: rendered_content = rt(attr, content, jinja_context)
[2018-06-20 15:35:34,847] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 2481, in render_template
[2018-06-20 15:35:34,848] {base_task_runner.py:98} INFO - Subtask: return jinja_env.get_template(content).render(**context)
[2018-06-20 15:35:34,849] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 812, in get_template
[2018-06-20 15:35:34,849] {base_task_runner.py:98} INFO - Subtask: return self._load_template(name, self.make_globals(globals))
[2018-06-20 15:35:34,850] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/environment.py", line 774, in _load_template
[2018-06-20 15:35:34,851] {base_task_runner.py:98} INFO - Subtask: cache_key = self.loader.get_source(self, name)[1]
[2018-06-20 15:35:34,852] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/loaders.py", line 171, in get_source
[2018-06-20 15:35:34,854] {base_task_runner.py:98} INFO - Subtask: f = open_if_exists(filename)
[2018-06-20 15:35:34,855] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/jinja2/utils.py", line 151, in open_if_exists
[2018-06-20 15:35:34,856] {base_task_runner.py:98} INFO - Subtask: return open(filename, mode)
[2018-06-20 15:35:34,856] {base_task_runner.py:98} INFO - Subtask: IOError: [Errno 20] Not a directory: '/home/airflow/gcs/dags/my_zip_file.zip/sql/query_file.sql'

最佳答案

目前看来 Airflow(从 1.10 版本开始)支持从压缩 DAG 加载模板,因为它仅使用 jinja2.FileSystemLoader 来加载它们(参见 DAG#get_template_env ).

关于python - Airflow/Composer - 在 zip 打包的 DAG 中找不到模板,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50952568/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com