gpt4 book ai didi

python-2.7 - 如何使用 Cloud Composer/Apache Airflow 运行带有设置文件的 Dataflow 管道?

转载 作者:行者123 更新时间:2023-12-02 03:29:18 25 4
gpt4 key购买 nike

我有一个工作数据流管道,首先运行setup.py来安装一些本地帮助程序模块。我现在想使用 Cloud Composer/Apache Airflow 来调度管道。我已经创建了 DAG 文件,并将其与管道项目一起放置在指定的 Google Storage DAG 文件夹中。文件夹结构如下所示:

{Composer-Bucket}/
dags/
--DAG.py
Pipeline-Project/
--Pipeline.py
--setup.py
Module1/
--__init__.py
Module2/
--__init__.py
Module3/
--__init__.py

我的 DAG 中指定 setup.py 文件的部分如下所示:

resumeparserop = dataflow_operator.DataFlowPythonOperator(
task_id="resumeparsertask",
py_file="gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/Pipeline.py",
dataflow_default_options={
"project": {PROJECT-NAME},
"setup_file": "gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py"})

但是,当我查看 Airflow Web UI 中的日志时,出现错误:

RuntimeError: The file gs://{COMPOSER-BUCKET}/dags/Pipeline-Project/setup.py cannot be found. It was specified in the --setup_file command line option.

我不知道为什么找不到安装文件。如何使用设置文件/模块运行我的数据流管道?

最佳答案

如果您查看 DataflowPythonOperator 的代码看起来主 py_file 可以是 GCS 存储桶内的文件,并在执行管道之前由运算符(operator)本地化。但是,我没有看到 dataflow_default_options 类似的内容。看起来这些选项只是简单地复制并格式化。

由于 GCS dag 文件夹是使用 Cloud Storage Fuse 安装在 Airflow 实例上的您应该能够使用“dags_folder”环境变量在本地访问该文件。即你可以这样做:

from airflow import configuration
....
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'Pipeline-Project', 'setup.py')

然后,您可以将 LOCAL_SETUP_FILE 变量用于 dataflow_default_options 中的 setup_file 属性。

关于python-2.7 - 如何使用 Cloud Composer/Apache Airflow 运行带有设置文件的 Dataflow 管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52320531/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com