gpt4 book ai didi

directed-acyclic-graphs - Apache Airflow 2.0.0.b2 - 动态 EmailOperator [文件] 属性

转载 作者:行者123 更新时间:2023-12-04 15:11:53 29 4
gpt4 key购买 nike

TL;DR 如何创建一个动态的 EmailOperator,它从作为 XCom 属性的文件路径发送文件

大家好,

我使用的是 Apache Airflow 2.0.0.b2。我的问题是我的 DAG 创建了一个名称在运行时更改的文件。我想通过电子邮件发送此文件,但在将动态文件名输入我的 EmailOperator 时遇到问题。

我尝试过但失败的事情!

  1. files 属性使用模板。

    files=["{{ ti.xcom_pull(key='OUTPUT_CSV') }}"],

    不幸的是,模板只有在运算符中的字段被标记为模板时才有效。 files 不是 EmailOperator 上的可模板化字段

  2. 使用函数动态创建我的任务

     def get_email_operator(?...):
    export_file_path = ti.xcom_pull(key='OUTPUT_CSV')
    email_subject = 'Some Subject'
    return EmailOperator(
    task_id="get_email_operator",
    to=['someemail@somedomain.net'],
    subject=email_subject,
    files=[export_file_path,],
    html_content='<br>',
    dag=current_dag)

    ..task3 >> get_email_operator() >> task4

    不幸的是,我似乎无法弄清楚如何将当前的 **kwargsti 信息传递到我的函数调用中以获取当前文件路径。

编辑: Elad 在下面的回答让我朝着正确的方向前进。我唯一要做的就是在调用 op.execute() 时添加 kwargs

解决方案:

def get_email_operator(**kwargs):
export_file_path = kwargs['ti'].xcom_pull(key='OUTPUT_CSV')
email_subject = 'Termed Drivers - ' + date_string
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[export_file_path,],
html_content='<br>')
op.execute(kwargs)

最佳答案

文件将在 Airflow 2 中被模板化为 PR上周合并。

但是您无需等待,您可以使用您自己的自定义运算符来包装当前运算符,指定模板化字段列表。

喜欢:

class MyEmailOperator(EmailOperator):
template_fields = ('to', 'subject', 'html_content', 'files')

然后您可以在您的代码中使用MyEmailOperator文件将被模板化。

另一个选项是包装 EmailOperator 的 PythonOperator:

def get_email_operator(**context):
xcom = context['ti'].xcom_pull(task_ids='OUTPUT_CSV')
email_subject = 'Some Subject'
op = EmailOperator(
task_id="get_email_operator",
to=['someemail@somedomain.net'],
subject=email_subject,
files=[xcom,],
html_content='<br>')
op.execute(context)

python = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=get_email_operator,
provide_context=True
)

..task3 >> python >> task4

编辑:

注意:在运算符内部使用运算符不是一个好习惯。您可以在 following answer 中阅读更多有关原因的信息。 .如果您打算使用第二种方法,那么更好的办法是直接使用 send_email 函数(EmailOperator 调用此函数)

关于directed-acyclic-graphs - Apache Airflow 2.0.0.b2 - 动态 EmailOperator [文件] 属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65077581/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com