- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Airflow PythonOperator 来使用数据流运行器执行 python Beam 作业。Dataflow 作业返回错误 "ModuleNotFoundError: No module named 'airflow'"
在 DataFlow UI 中,使用 PythonOperator 调用作业时使用的 SDK 版本是 2.15.0。如果作业是从 Cloud shell 执行的,所使用的 SDK 版本是 2.23.0。该工作在从以下位置启动时有效外壳。
Composer 的环境详细信息是:
Image version = composer-1.10.3-airflow-1.10.3
Python version= 3
之前的帖子建议使用 PythonVirtualenvOperator 运算符。我尝试使用以下设置:
requirements=['apache-beam[gcp]'],
python_version=3
Composer 返回错误 "'install', 'apache-beam[gcp]']' returned non-zero exit status 2."
如有任何建议,我们将不胜感激。
这是调用数据流作业的 DAG。我没有展示 DAG 中使用的所有函数,但将导入保留在:
import logging
import pprint
import json
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.models import DAG
import google.cloud.logging
from datetime import timedelta
from airflow.utils.dates import days_ago
from deps import utils
from google.cloud import storage
from airflow.exceptions import AirflowException
from deps import logger_montr
from deps import dataflow_clean_csv
dag = DAG(dag_id='clean_data_file',
default_args=args,
description='Runs Dataflow to clean csv files',
schedule_interval=None)
def get_values_from_previous_dag(**context):
var_dict = {}
for key, val in context['dag_run'].conf.items():
context['ti'].xcom_push(key, val)
var_dict[key] = val
populate_ti_xcom = PythonOperator(
task_id='get_values_from_previous_dag',
python_callable=get_values_from_previous_dag,
provide_context=True,
dag=dag,
)
dataflow_clean_csv = PythonOperator(
task_id = "dataflow_clean_csv",
python_callable = dataflow_clean_csv.clean_csv_dataflow,
op_kwargs= {
'project':
'zone':
'region':
'stagingLocation':
'inputDirectory':
'filename':
'outputDirectory':
},
provide_context=True,
dag=dag,
)
populate_ti_xcom >> dataflow_clean_csv
我使用 ti.xcom_pull(task_ids = 'get_values_from_previous_dag') 方法分配 op_kwargs。
这是被调用的数据流作业:
import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
line = [s.replace('\"', '') for s in line]
clean_line = '","'.join(line)
final_line = '"'+ clean_line +'"'
return final_line
def clean_csv_dataflow(**kwargs):
argv = [
# Dataflow pipeline options
"--region={}".format(kwargs["region"]),
"--project={}".format(kwargs["project"]) ,
"--temp_location={}".format(kwargs["stagingLocation"]),
# Setting Dataflow pipeline options
'--save_main_session',
'--max_num_workers=8',
'--autoscaling_algorithm=THROUGHPUT_BASED',
# Mandatory constants
'--job_name=cleancsvdataflow',
'--runner=DataflowRunner'
]
options = PipelineOptions(
flags=argv
)
pipeline = beam.Pipeline(options=options)
inputDirectory = kwargs["inputDirectory"]
filename = kwargs["filename"]
outputDirectory = kwargs["outputDirectory"]
outputfile_temp = filename
outputfile_temp = outputfile_temp.split(".")
outputfile = "_CLEANED.".join(outputfile_temp)
in_path_and_filename = "{}{}".format(inputDirectory,filename)
out_path_and_filename = "{}{}".format(outputDirectory,outputfile)
pipeline = beam.Pipeline(options=options)
clean_csv = (pipeline
| "Read input file" >> beam.io.ReadFromText(in_path_and_filename)
| "Parse file" >> beam.Map(parse_file)
| "writecsv" >> beam.io.WriteToText(out_path_and_filename,num_shards=1)
)
pipeline.run()
最佳答案
此答案由@BSpinoza 在评论区提供:
What I did was move all
imports
from the global namespace and placethem into the function definitions. Then, from the calling DAG I usedtheBashOperator
. It worked.
此外,推荐的方法之一是使用 DataFlowPythonOperator .
关于python - 模块未找到错误 : No module named 'airflow' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63351208/
我有 4 个文件。 C:\perlCode2\start.pl6 C:\perlCode2\file0.pm6 C:\perlCode2\folder1\file1.pm6 C:\perlCode2\
我有一个结构如下的模块: /module __init__.py /submod_1 __init__.py submod_1_class.py
我的源代码在 java 7 上编译并在 java 11 上运行。 我正在尝试将 imperva RASP 作为 java 代理集成到 tomcat 中。但是,当我启动 tomcat 服务器时,它抛出以
justExport.js const first = () => { console.log('frist from justExport') } const second = () => {
以下模块用JS文件编写: module.exports = { propA: 1, propB: 2 } 允许稍后从模块导入属性,如:从“路径/到/模块”导入 { propA} 然而,将文件格
我一直在尝试在嵌套的惰性加载模块中实现ngx翻译,但一直未能如愿。我面临的唯一问题是,每当我通过选择器更改语言时,嵌套延迟加载模块中的语言都不会更改。 HttpLoader 工作正常,其他一切工作正常
我没有可重复的示例,因为问题更多是关于模块如何工作。我试图了解如何将一些 react 功能从一个模块传递到下一个模块。过去我收到过有关使用 ObserveEvent 的回复,但是当我在一个模块中使用响
我正在阅读Wikipedia's definition of Dependency inversion principle ,它使用了两个术语高级模块和低级模块,我无法弄清楚。 它们是什么以及依赖倒置
问题 我遇到的一个问题是将两个模块的类型和值带入一个新的组合模块中。我举个例子。目前我有以下两种类型签名 module type Ordered = sig type t (* the type
我是 JavaScript 的新手,最近一直在努力处理导入问题。有一件事我无法理解。 在较旧的节点模块(主要是那些在 ES6 之前出现的模块)中,可以使用 npm 安装,例如 express,通常没有
我正在尝试使用 System.JS 将 material-ui 导入我的 React 应用 在我的应用中,我这样做: import {AppBar, Tabs, Tab, Card, CardTitl
我想使用功能module->exports查找模块提供的所有导出。不幸的是,传递给该函数的模块必须在当前命名空间中声明,然后才能在其上使用该函数。当我静态地知道模块是什么时,这没问题,我只需要将其引入
目录结构如下 outdir |--lib |--- __init__.py |--- abc.py |--indir
这与提到的非常相似 here但是评论或回答中提供的每个解决方案都没有解决我的问题。想看看是否还有其他我应该看的东西。我尝试了不同的路径,比如 ./app/mycomponent/mycomponent
我有两个 Angular 模块:main 和 feature: 主/根模块: @NgModule({ imports: [ StoreModule.forRoot({route
我尝试在 Ubuntu 04.12 LTS x64 中安装“Userful MultiSeat-X64-5.0.1 ...”,在安装结束时遇到以下错误: File "", line 6, in Im
我正在尝试优化我的 vendor bundle.js,因为它已经膨胀并且我正在使用 material-ui 库。 import Card from 'material-ui'; // Very bad
错误: Import-Module : The specified module 'msonline' was not loaded because no valid module file was
我在 Server 2008 SP2(64 位)上执行导入模块 ActiveDirectory 时遇到问题。 NET Framework 3.5 SP1 已安装 我下载了 Windows6.0-KB9
嗯,你好! 我正在编写一个脚本来获取 Sql 作业历史记录,并且需要使用“SqlServer”模块。它已安装,但由于上面的错误消息,我无法导入它。当我到达模块路径时,文件夹“SqlServer”存在并
我是一名优秀的程序员,十分优秀!