gpt4 book ai didi

airflow - 创建唯一文件名并在所有 Airflow 任务中访问该文件

转载 作者:行者123 更新时间:2023-12-04 16:10:06 29 4
gpt4 key购买 nike

我们能否在每次运行 airflow dag 时创建唯一的文件名并从所有任务访问该文件?我尝试创建全局变量 (output_filename) 并向其附加时间戳。但是当我在任务中访问该文件名时,每个任务都会生成不同的文件名,因为它在每个任务中计算时间戳。下面是示例代码:

table_name = 'Test_ABC'
start_date = datetime.now()
cur_tmpstp = start_date.strftime('%Y_%m_%d')

output_filename = table_name + "_" + cur_tmpstp + ".csv"
S3_landing_path = "s3://abc/"

def clean_up():
if os.path.exists(output_filename):
os.remove(output_filename)


task_1 = BashOperator(
task_id='task_1',
bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path, ),
dag=dag)

task_2_cleanup = PythonOperator(
task_id='task_2_cleanup',
python_callable=clean_up,
dag=dag)

我们有更多的任务需要访问 output_filename。我们如何在所有任务中访问 output_filename 全局变量?

最佳答案

如果您只需要具有日粒度的时间戳,那么您可以使用带有模板的默认变量。此类变量的一些示例(取自 http://airflow.readthedocs.io/en/latest/code.html#default-variables )是

{{ ds }}    the execution date as YYYY-MM-DD
{{ ds_nodash }} the execution date as YYYYMMDD
{{ execution_date }} the execution_date, (datetime.datetime)

关于airflow - 创建唯一文件名并在所有 Airflow 任务中访问该文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44059484/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com