gpt4 book ai didi

airflow - 谷歌数据流 : Import custom Python module

转载 作者:行者123 更新时间:2023-12-05 05:06:41 28 4
gpt4 key购买 nike

我尝试在 Google Cloud Dataflow 中运行 Apache Beam 管道 (Python),由 Google Cloud Coomposer 中的 DAG 触发。

我在各自GCS bucket中的dags文件夹结构如下:

/dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline

setup.py 非常基础,但根据 Apache Beam 文档和关于 SO 的回答:

import setuptools

setuptools.setup(setuptools.find_packages())

在 DAG 文件 (dataflow.py) 中,我设置了 setup_file 选项并将其传递给 Dataflow:

default_dag_args = {
... ,
'dataflow_default_options': {
... ,
'runner': 'DataflowRunner',
'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
}
}

在管道文件 (pipeline.py) 中,我尝试使用

from my_modules import commons

但这失败了。 Google Cloud Composer (Apache Airflow) 中的日志显示:

gcp_dataflow_hook.py:132} WARNING - b'  File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n    from my_modules import commons\n           ^\nSyntaxError: invalid syntax'

setup.py 文件背后的基本思想已记录在案 here

此外,关于 SO 的类似问题对我有帮助:

Google Dataflow - Failed to import custom python modules

Dataflow/apache beam: manage custom module dependencies

我实际上想知道为什么我的管道因 Syntax Error 而不是 module not found 之类的错误而失败...

最佳答案

我试图重现您的问题然后尝试解决它,所以我创建了与您已有的相同的文件夹结构:

/dags/
dataflow.py
dataflow/
pipeline.py -> pipeline
setup.py
my_modules/
__init__.py
common.py

因此,为了让它工作,我所做的更改是将这些文件夹复制到实例运行代码能够找到它的地方,例如在 /tmp/ 文件夹中实例的。

所以,我的 DAG 应该是这样的:

1 - 首先我要声明我的论点:

default_args = {
'start_date': datetime(xxxx, x, x),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'dataflow_default_options': {
'project': '<project>',
'region': '<region>',
'stagingLocation': 'gs://<bucket>/stage',
'tempLocation': 'gs://<bucket>/temp',
'setup_file': <setup.py>,
'runner': 'DataflowRunner'
}
}

2- 在此之后,我创建了 DAG,在运行数据流任务之前,我将上面创建的整个文件夹目录复制到实例 Task t1 的 /tmp/ 文件夹中,然后,我从/tmp/目录 Task t2 运行管道:

with DAG(
'composer_df',
default_args=default_args,
description='datflow dag',
schedule_interval="xxxx") as dag:

def copy_dependencies():
process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*',
'/tmp/'])
process.communicate()


t1 = python_operator.PythonOperator(
task_id='copy_dependencies',
python_callable=copy_dependencies,
provide_context=False
)


t2 = DataFlowPythonOperator(task_id="composer_dataflow",
py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')

t1 >> t2

这就是我创建 DAG 文件 dataflow.py 的方式,然后在 pipeline.py 中要导入的包如下所示:

from my_modules import commons

它应该可以正常工作,因为 VM 可以理解文件夹目录。

关于airflow - 谷歌数据流 : Import custom Python module,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59717352/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com