gpt4 book ai didi

python-3.x - 没有名为 airfow.gcp 的模块 - 如何运行使用 python3/beam 2.15 的数据流作业?

转载 作者:行者123 更新时间:2023-12-04 12:26:27 25 4
gpt4 key购买 nike

当我使用 BigQueryHook 之类的运算符/ Hook 时,我看到一条消息,指出这些运算符已被弃用并使用 Airflow .gcp... 运算符版本。但是,当我尝试在我的 dag 中使用它时,它会失败并说没有名为 airflow.gcp 的模块。我有最新的 Airflow Composer 版本,带有 beta 功能,python3。是否有可能以某种方式安装这些运算符?

我正在尝试使用梁 2.15 在 python 3 中运行数据流作业。我试过 virtualenv 运算符,但这不起作用,因为它只允许 python2.7。我怎样才能做到这一点?

最佳答案

Composer 中可用的最新 Airflow 版本是 1.10.2 或 1.10.3(取决于地区)。到那时,这些运营商在 contrib 部分。

专注于如何使用 Composer 运行 Python 3 Dataflow 作业,您需要发布新版本。但是,如果您需要立即解决方案,您可以尝试向后移植 fix .

在这种情况下,我定义了 DataFlow3Hook它扩展了正常的 DataFlowHook但它没有硬编码 python2start_python_dataflow方法:

class DataFlow3Hook(DataFlowHook):
def start_python_dataflow(
...
py_interpreter: str = "python3"
):

...

self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
label_formatter)

然后我们将有我们的自定义 DataFlowPython3Operator调用新钩子(Hook):

class DataFlowPython3Operator(DataFlowPythonOperator):

def execute(self, context):
...
hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
poll_sleep=self.poll_sleep)
...
hook.start_python_dataflow(
self.job_name, formatted_options,
self.py_file, self.py_options, py_interpreter="python3")

最后,在我们的 DAG 中,我们只使用 new 运算符:

task = DataFlowPython3Operator(
py_file='/home/airflow/gcs/data/main.py',
task_id=JOB_NAME,
dag=dag)

查看完整代码 here .作业使用 Python 3.6 运行:

enter image description here

使用的环境详细信息和依赖项(Beam 作业是一个最小示例):

softwareConfig:
imageVersion: composer-1.8.0-airflow-1.10.3
pypiPackages:
apache-beam: ==2.15.0
google-api-core: ==1.14.3
google-apitools: ==0.5.28
google-cloud-core: ==1.0.3
pythonVersion: '3'

让我知道这是否适合你。如果是这样,我建议将代码移动到插件中以提高代码可读性并在 DAG 中重用它。

关于python-3.x - 没有名为 airfow.gcp 的模块 - 如何运行使用 python3/beam 2.15 的数据流作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58545759/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com