gpt4 book ai didi

python - 模块未找到错误 : No module named 'airflow'

转载 作者:行者123 更新时间:2023-12-04 17:25:55 26 4
gpt4 key购买 nike

我正在使用 Airflow PythonOperator 来使用数据流运行器执行 python Beam 作业。Dataflow 作业返回错误 "ModuleNotFoundError: No module named 'airflow'"

在 DataFlow UI 中,使用 PythonOperator 调用作业时使用的 SDK 版本是 2.15.0。如果作业是从 Cloud shell 执行的,所使用的 SDK 版本是 2.23.0。该工作在从以下位置启动时有效外壳。

Composer 的环境详细信息是:

Image version = composer-1.10.3-airflow-1.10.3

Python version= 3

之前的帖子建议使用 PythonVirtualenvOperator 运算符。我尝试使用以下设置:

requirements=['apache-beam[gcp]'],

python_version=3

Composer 返回错误 "'install', 'apache-beam[gcp]']' returned non-zero exit status 2."

如有任何建议,我们将不胜感激。

这是调用数据流作业的 DAG。我没有展示 DAG 中使用的所有函数,但将导入保留在:

  import logging
import pprint
import json
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.models import DAG
import google.cloud.logging
from datetime import timedelta
from airflow.utils.dates import days_ago
from deps import utils
from google.cloud import storage
from airflow.exceptions import AirflowException
from deps import logger_montr
from deps import dataflow_clean_csv



dag = DAG(dag_id='clean_data_file',
default_args=args,
description='Runs Dataflow to clean csv files',
schedule_interval=None)

def get_values_from_previous_dag(**context):
var_dict = {}
for key, val in context['dag_run'].conf.items():
context['ti'].xcom_push(key, val)
var_dict[key] = val

populate_ti_xcom = PythonOperator(
task_id='get_values_from_previous_dag',
python_callable=get_values_from_previous_dag,
provide_context=True,
dag=dag,
)


dataflow_clean_csv = PythonOperator(
task_id = "dataflow_clean_csv",
python_callable = dataflow_clean_csv.clean_csv_dataflow,
op_kwargs= {
'project':
'zone':
'region':
'stagingLocation':
'inputDirectory':
'filename':
'outputDirectory':
},
provide_context=True,
dag=dag,
)

populate_ti_xcom >> dataflow_clean_csv

我使用 ti.xcom_pull(task_ids = 'get_values_from_previous_dag') 方法分配 op_kwargs。

这是被调用的数据流作业:

import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText


def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
line = [s.replace('\"', '') for s in line]
clean_line = '","'.join(line)
final_line = '"'+ clean_line +'"'
return final_line

def clean_csv_dataflow(**kwargs):
argv = [
# Dataflow pipeline options
"--region={}".format(kwargs["region"]),
"--project={}".format(kwargs["project"]) ,
"--temp_location={}".format(kwargs["stagingLocation"]),
# Setting Dataflow pipeline options
'--save_main_session',
'--max_num_workers=8',
'--autoscaling_algorithm=THROUGHPUT_BASED',
# Mandatory constants
'--job_name=cleancsvdataflow',
'--runner=DataflowRunner'
]
options = PipelineOptions(
flags=argv
)

pipeline = beam.Pipeline(options=options)

inputDirectory = kwargs["inputDirectory"]
filename = kwargs["filename"]
outputDirectory = kwargs["outputDirectory"]


outputfile_temp = filename
outputfile_temp = outputfile_temp.split(".")
outputfile = "_CLEANED.".join(outputfile_temp)

in_path_and_filename = "{}{}".format(inputDirectory,filename)
out_path_and_filename = "{}{}".format(outputDirectory,outputfile)

pipeline = beam.Pipeline(options=options)


clean_csv = (pipeline
| "Read input file" >> beam.io.ReadFromText(in_path_and_filename)
| "Parse file" >> beam.Map(parse_file)
| "writecsv" >> beam.io.WriteToText(out_path_and_filename,num_shards=1)
)

pipeline.run()

最佳答案

此答案由@BSpinoza 在评论区提供:

What I did was move all imports from the global namespace and placethem into the function definitions. Then, from the calling DAG I usedthe BashOperator. It worked.

此外,推荐的方法之一是使用 DataFlowPythonOperator .

关于python - 模块未找到错误 : No module named 'airflow' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63351208/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com