gpt4 book ai didi

python - 仅当在 python 中使用 DAG 的 AWS athena 表中有新分区/数据可用时,如何触发 Airflow 任务?

转载 作者:行者123 更新时间:2023-12-04 04:13:25 26 4
gpt4 key购买 nike

我有一个像下面这样的场景:

  • 触发 Task 1Task 2仅当源表 (Athena) 中的新数据可供他们使用时。 Task1 和 Task2 的触发器应该在一天内有新的数据分区时发生。
  • 触发器 Task 3仅在 Task 1 完成时和 Task 2
  • 触发器 Task 4仅完成Task 3

  • enter image description here

    我的代码
    from airflow import DAG

    from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
    from datetime import datetime, timedelta

    from airflow.operators.postgres_operator import PostgresOperator
    from utils import FAILURE_EMAILS

    yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
    }

    dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

    Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

    Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

    execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
    )

    execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
    )



    execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
    )

    execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
    )



    execute_Task1.set_upstream(Athena_Trigger_for_Task1)
    execute_Task2.set_upstream(Athena_Trigger_for_Task2)

    execute_Task3.set_upstream(execute_Task1)
    execute_Task3.set_upstream(execute_Task2)

    execute_Task4.set_upstream(execute_Task3)

    实现它的最佳最佳方式是什么?

    最佳答案

    我相信你的问题解决了两个主要问题:

  • 忘记配置 schedule_interval以一种明确的方式,所以@daily 正在设置您不期望的内容。
  • 依赖外部事件完成执行时如何正确触发和重试dag的执行

  • 简短的回答:使用 cron 作业格式明确设置您的 schedule_interval 并使用传感器运算符(operator)不时检查
    default_args={
    'retries': (endtime - starttime)*60/poke_time
    }
    dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
    Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

    哪里 startime是你的日常任务什么时候开始, endtime在标记为失败和 poke_time 之前,您应该检查事件是否已完成的一天中最后一次是什么时候?是您的间隔时间 sensor_operator将检查事件是否发生。

    如何明确解决 cron 作业
    每当您将 dag 设置为 @daily像你一样:
    dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

    来自 docs ,你可以看到你实际上在做: @daily - Run once a day at midnight
    这现在是有道理的,为什么您会收到超时错误,并在 5 分钟后失败,因为您设置了 'retries': 1'retry_delay': timedelta(minutes=5) .所以它尝试在午夜运行 dag,但失败了。 5 分钟后再次重试并再次失败,因此将其标记为失败。

    所以基本上@daily run 正在设置一个隐式的 cron 作业:
    @daily -> Run once a day at midnight -> 0 0 * * *

    cron 作业格式为以下格式,您将值设置为 *每当您想说“全部”时。
    Minute Hour Day_of_Month Month Day_of_Week
    所以@daily 基本上是说每运行一次:minute 0 hours 0 of all days_of_month of all days_of_week

    因此,您的案例每运行一次:所有 days_of_month of all_months of all days_of_week 的分钟 0 小时 10。这以 cron 作业格式转换为:
    0 10 * * *

    依赖外部事件完成执行时如何正确触发和重试dag的执行
  • 您可以使用命令 airflow trigger_dag 从外部事件触发 Airflow 中的 dag。 .如果您可以通过某种方式触发 lambda 函数/python 脚本来定位您的 Airflow 实例,那么这将是可能的。
  • 如果您无法从外部触发 d​​ag,则使用像 OP 那样的传感器运算符,为其设置 poke_time 并设置合理的高重试次数。
  • 关于python - 仅当在 python 中使用 DAG 的 AWS athena 表中有新分区/数据可用时,如何触发 Airflow 任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61242165/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com