- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我广泛搜索了 Airflow 博客和文档来调试我遇到的问题。
我想要解决的问题
检查 ftp 服务器上是否存在特定文件
如果存在,请将其上传到云端
如果不存在,请向客户发送电子邮件,报告未找到文件
我有什么
扩展 BaseOperator 的自定义运算符,使用 SSH Hook 并推送值(true 或 false)。
使用 BranchPythonOperator 从 xcom 中提取值并检查上一个任务是否返回 true 或 false 并决定下一个任务的任务。
请看下面的代码。这段代码是我正在尝试做的事情的简化版本。
如果有人对我的原始代码感兴趣,请向下滚动到问题的末尾。
此处,自定义运算符仅根据分钟的偶数或奇数返回字符串偶数或奇数。
import logging
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from datetime import datetime
log = logging.getLogger(__name__)
class MediumTestOperator(BaseOperator):
@apply_defaults
def __init__(self,
do_xcom_push=True,
*args,
**kwargs):
super(MediumTestOperator, self).__init__(*args, **kwargs)
self.do_xcom_push = do_xcom_push
self.args = args
self.kwargs = kwargs
def execute(self, context):
# from IPython import embed; embed()
current_minute = datetime.now().minute
context['ti'].xcom_push(key="Airflow", value="Apache Incubating")
if current_minute %2 == 0:
context['ti'].xcom_push(key="minute", value="Even")
else:
context['ti'].xcom_push(key="minute", value="Odd")
# from IPython import embed; embed()
class MediumTestOperatorPlugin(AirflowPlugin):
name = "medium_test"
operators = [MediumTestOperator]
文件:caller.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from medium_payen_op import MediumTestOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'guillaume',
'depends_on_past': False,
'start_date': datetime(2018, 6, 18),
'email': ['hello@moonshots.ai'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'Weekday',
default_args=default_args,
schedule_interval="@once")
sample_task = MediumTestOperator(
task_id='task_1',
provide_context=True,
dag=dag
)
def get_branch_follow(**kwargs):
x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
print("From Kwargs: ", x)
if x == 'Even':
return 'task_3'
else:
return 'task_4'
task_2 = BranchPythonOperator(
task_id='task_2_branch',
python_callable=get_branch_follow,
provide_context=True,
dag=dag
)
def get_dample(**kwargs):
x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
y = kwargs['ti'].xcom_pull(task_ids='task_1', key="Airflow")
print("Minute is:", x, " Airflow is from: ", y)
print("Task 3 Running")
task_3 = PythonOperator(
python_callable=get_dample,
provide_context=True,
dag=dag,
task_id='task_3'
)
def get_dample(**kwargs):
x = kwargs['ti'].xcom_pull(task_ids='task_1', key="minute")
y = kwargs['ti'].xcom_pull(task_ids='task_1', key="Airflow")
print("Minute is:", x, " Airflow is from: ", y)
print("Task 4 Running")
task_4 = PythonOperator(
python_callable=get_dample,
provide_context=True,
dag=dag,
task_id='task_4'
)
sample_task >> task_3
task_2 >> task_3
task_2 >> task_4
正如您从附图中看到的,Xcom 推送确实有效,我可以从 PythonOperator 中提取值,但不能从 BranchPythonOperator 中提取值。
感谢任何帮助。
Xcom 从 BranchPythonOperator 的 Python Callable 内部拉取始终返回“None”,导致 Else block 始终运行。
Xcom 从 PythonOperator 中提取会返回正确的值。
<小时/>这是我正在使用的原始代码
自定义运算符将字符串 True 或 False 作为 Xcom 值推送,然后由 BranchPythonOperator 读取。
我想读取在 BranchPythonOperator 任务中使用上述自定义运算符创建的任务推送的值,并根据返回值选择不同的路径。
文件:check_file_exists_operator.py
import logging
from tempfile import NamedTemporaryFile
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
log = logging.getLogger(__name__)
class CheckFileExistsOperator(BaseOperator):
"""
This operator checks if a given file name exists on the
the sftp server.
Returns true if it exists, false otherwise.
:param sftp_path_prefix: The sftp remote path. This is the specified file path
for downloading the file from the SFTP server.
:type sftp_path_prefix: string
:param file_to_be_processed: File that is to be Searched
:type file_to_be_processed: str
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: string
:param timeout: timeout (in seconds) for executing the command.
:type timeout: int
:param do_xcom_push: return the stdout which also get set in xcom by
airflow platform
:type do_xcom_push: bool
"""
FORWARD_SLASH_LITERAL = '/'
template_fields = ('file_to_be_processed',)
@apply_defaults
def __init__(self,
sftp_path_prefix,
file_to_be_processed,
sftp_conn_id='ssh_default',
timeout=10,
do_xcom_push=True,
*args,
**kwargs):
super(CheckFileExistsOperator, self).__init__(*args, **kwargs)
self.sftp_path_prefix = sftp_path_prefix
self.file_to_be_processed = file_to_be_processed
self.sftp_conn_id = sftp_conn_id
self.timeout = timeout
self.do_xcom_push = do_xcom_push
self.args = args
self.kwargs = kwargs
def execute(self, context):
# Refer to https://docs.paramiko.org/en/2.4/api/sftp.html
ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id)
sftp_client = ssh_hook.get_conn().open_sftp()
sftp_file_absolute_path = self.sftp_path_prefix.strip() + \
self.FORWARD_SLASH_LITERAL + \
self.file_to_be_processed.strip()
task_instance = context['task_instance']
log.debug('Checking if the follwoing file exists: %s', sftp_file_absolute_path)
try:
with NamedTemporaryFile("w") as temp_file:
sftp_client.get(sftp_file_absolute_path, temp_file.name)
# Return a string equivalent of the boolean.
# Returning a boolean will make the key unreadable
params = {'file_exists' : True}
self.kwargs['params'] = params
task_instance.xcom_push(key="file_exists", value='True')
log.info('File Exists, returning True')
return 'True'
except FileNotFoundError:
params = {'file_exists' : False}
self.kwargs['params'] = params
task_instance.xcom_push(key="file_exists", value='False')
log.info('File Does not Exist, returning False')
return 'False'
class CheckFilePlugin(AirflowPlugin):
name = "check_file_exists"
operators = [CheckFileExistsOperator]
文件:airflow_dag_sample.py
import logging
from airflow import DAG
from check_file_exists_operator import CheckFileExistsOperator
from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import timedelta, datetime
from dateutil.relativedelta import relativedelta
from airflow.operators.email_operator import EmailOperator
log = logging.getLogger(__name__)
FORWARD_SLASH_LITERAL = '/'
default_args = {
'owner': 'gvatreya',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'email': ['***@***.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=2),
'timeout': 10,
'sftp_conn_id': 'sftp_local_cluster',
'provide_context': True
}
dag = DAG('my_test_dag',
description='Another tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20),
default_args=default_args,
template_searchpath='/Users/your_name/some_path/airflow_home/sql',
catchup=False)
template_filename_from_xcom = """
{{ task_instance.xcom_pull(task_ids='get_fname_ships', key='file_to_be_processed', dag_id='my_test_dag') }}
"""
template_file_prefix_from_xcom = """
{{ task_instance.xcom_pull(task_ids='get_fname_ships', key="month_prefix_for_file", dag_id='my_test_dag') }}
"""
t_check_file_exists = CheckFileExistsOperator(
sftp_path_prefix='/toDjembe',
file_to_be_processed=template_filename_from_xcom.strip(),
sftp_conn_id='sftp_local_cluster',
task_id='check_file_exists',
dag=dag
)
def branch(**kwargs):
file_exist = kwargs['task_instance'].xcom_pull(task_ids='get_fname_ships', key="file_exists",
dag_id='my_test_dag')
print(template_filename_from_xcom)
from IPython import embed; embed()
log.debug("FILE_EXIST(from branch): %s", file_exist)
if file_exist:
return 's3_upload'
else:
return 'send_file_not_found_email'
t_branch_on_file_existence = BranchPythonOperator(
task_id='branch_on_file_existence',
python_callable=branch,
dag=dag
)
t_send_file_not_found_email = EmailOperator(
task_id='send_file_not_found_email',
to='***@***.com',
subject=template_email_subject.format(state='FAILURE',filename=template_filename_from_xcom.strip(),content='Not found on SFTP Server'),
html_content='File Not Found in SFTP',
mime_charset='utf-8',
dag=dag
)
t_upload_to_s3 = SFTPToS3Operator(
task_id='s3_upload',
sftp_conn_id='sftp_local_cluster',
sftp_path='/djembe/' + template_filename_from_xcom.strip(),
s3_conn_id='s3_conn',
s3_bucket='djembe-users',
s3_key='gvatreya/experiment/' + template_file_prefix_from_xcom.strip() + FORWARD_SLASH_LITERAL + template_filename_from_xcom.strip(),
dag=dag
)
t_check_file_exists >> t_branch_on_file_existence
t_branch_on_file_existence >> t_upload_to_s3
t_branch_on_file_existence >> t_send_file_not_found_email
但是,当我运行代码时,分支运算符总是看到字符串“None”。
但是,Xcom 的值为 true。
我尝试使用 IPython embed()
进行调试,发现任务实例不保存 xcom 的值。我尝试使用参数以及其他我能想到的东西,但无济于事。
花了几天时间研究这个问题后,我现在开始认为我错过了 Airflow 中 XCom 的一些重要内容。
希望大家能帮忙。
提前致谢。
最佳答案
我认为,问题在于依赖性。
您目前拥有以下内容:
sample_task >> task_3
task_2 >> task_3
task_2 >> task_4
将其更改为以下内容,即添加 sample_task >> tasK_2
行。
sample_task >> task_3
sample_task >> tasK_2
task_2 >> task_3
task_2 >> task_4
推送到 xcom 的任务应该在使用 BranchPythonOperator
的任务之前先运行
在第二个示例中,branch
函数使用xcom_pull(task_ids='get_fname_ships'
),但我找不到任何带有get_fname_ships
的任务任务id。
关于python - Airflow - 在 BranchPythonOperator 中访问 Xcom,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55613873/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!