gpt4 book ai didi

airflow - Apache Airflow 1.10+ 调度程序是否支持在特定时间在不同的 DST 感知时区运行 2 个 DAG?

转载 作者:行者123 更新时间:2023-12-05 00:46:46 26 4
gpt4 key购买 nike

Apache Airflow 1.10+引入了对 DST 感知的原生支持 timezones .

这让我认为(也许是错误的)应该可以在同一个 Airflow 调度程序上创建 2 个 DAG,这些调度程序是这样安排的:

  • 每天 06:00 Pacific/Auckland 开始时间
  • 每天 21:00 America/New_York 开始时间

  • 无需引入“休眠”到所需开始时间的任务。该文档明确排除了 DST 感知调度的 cron 调度程序,但只解释了如何将 DAG 设置为在该时区每天运行,默认情况下是午夜。

    以前关于此主题的问题仅考虑使用 cron scheduler或基于 pre-1.10 airflow它没有引入对 DST 感知时区的原生支持。

    在“airflow.cfg”中,我更新了 default_timezone到系统时区。然后我尝试像这样安排 DAG:
    DAG('NZ_SOD',
    description='New Zealand Start of Day',
    start_date=datetime(2018, 12, 11, 06, 00, tzinfo=pendulum.timezone('Pacific/Auckland')),
    catchup=False)

    和:
    DAG('NAM_EOD',
    description='North Americas End of Day',
    start_date=datetime(2018, 12, 11, 21, 00, tzinfo=pendulum.timezone('America/New_York')),
    catchup=False)

    但似乎传递给 start_date 的日期时间对象的“时间”部分在 Apache Airflow 中未明确考虑并会产生意外行为。

    Airflow 是否有任何内置选项来产生所需的行为,或者我是否试图使用错误的工具来完成这项工作?

    最佳答案

    答案是肯定的,cron 计划支持在 DST 感知时区运行 DAG。
    但是有一些警告,所以我不得不假设 Airflow 的维护者没有将此作为受支持的用例。首先是documentation ,截至撰写本文时,明确为 错了当它指出:

    Cron schedules

    In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.


    我已经编写了这个有点老套的代码,它让你看看计划如何在不需要运行 Airflow 实例的情况下工作(注意你安装了 Penulum 1.x 并使用正确的 documentation 如果你运行或编辑此代码) :
    import pendulum
    from airflow import DAG
    from datetime import timedelta


    # Set-up DAG
    test_dag = DAG(
    dag_id='foo',
    start_date=pendulum.datetime(year=2019, month=4, day=4, tz='Pacific/Auckland'),
    schedule_interval='00 03 * * *',
    catchup=False
    )

    # Check initial schedule
    execution_date = test_dag.start_date
    for _ in range(7):
    next_execution_date = test_dag.following_schedule(execution_date)
    if next_execution_date <= execution_date:
    execution_date = test_dag.following_schedule(execution_date + timedelta(hours=2))
    else:
    execution_date = next_execution_date
    print('Execution Date:', execution_date)
    这给了我们 7 天的时间让新西兰体验 DST:
    Execution Date: 2019-04-03 14:00:00+00:00
    Execution Date: 2019-04-04 14:00:00+00:00
    Execution Date: 2019-04-05 14:00:00+00:00
    Execution Date: 2019-04-06 14:00:00+00:00
    Execution Date: 2019-04-07 15:00:00+00:00
    Execution Date: 2019-04-08 15:00:00+00:00
    Execution Date: 2019-04-09 15:00:00+00:00
    正如我们所看到的,使用 cron 时间表观察到 DST,如果您编辑我的代码以删除 cron 时间表,您可以进一步看到 DST 是 不是 观测到的。
    但请注意,即使 cron 计划观察 DST,您可能仍然会在 DST 更改当天出现 1 天错误,因为 Airflow 提供的是前一个日期而不是当前日期(例如日历上的星期日,但在 Airflow 中)执行日期是星期六)。在我看来,这在 follow_schedule 中没有说明。逻辑。
    最后,@dlamblin 指出 Airflow 通过模板字符串或 provide_context=True 为作业提供的变量。如果 DAG 的本地执行日期与 UTC 执行日期不同,对于 Python 可调用对象将是错误的。这可以在 TaskInstance.get_template_context 中观察到。使用 self.execution_date无需将其修改为本地时间。我们可以在 TaskInstance.__init__ 中看到那 self.execution_date转换为 UTC。
    我处理这个问题的方法是派生一个我称之为 local_cal_date 的变量。通过执行@dlamblin 的建议并使用 convert Pendulum 的方法。编辑此代码以满足您的特定需求(我实际上在所有 Python 可调用对象的包装器中使用它,以便它们都接收 local_cal_date ):
    import datetime

    def foo(*args, dag, execution_date, **kwargs):
    # Derive local execution datetime from dag and execution_date that
    # airflow passes to python callables where provide_context is set to True
    airflow_timezone = dag.timezone
    local_execution_datetime = airflow_timezone.convert(execution_date)

    # I then add 1 day to make it the calendar day
    # and not the execution date which Airflow provides
    local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)
    更新:对于我发现的模板化字符串,最好的方法是创建自定义运算符,在呈现模板之前将自定义变量注入(inject)到上下文中。我发现使用自定义宏的问题是 don't expand other macros automatically ,这意味着您必须做很多额外的工作才能以有用的方式呈现它们。因此,在自定义运算符模块中,我与此代码有些相似:
    # Standard Library
    import datetime

    # Third Party Libraries
    import airflow.operators.email_operator
    import airflow.operators.python_operator
    import airflow.operators.bash_operator


    class CustomTemplateVarsMixin:
    def render_template(self, attr, content, context):
    # Do Calculations
    airflow_execution_datetime = context['execution_date']
    airflow_timezone = context['dag'].timezone
    local_execution_datetime = airflow_timezone.convert(airflow_execution_datetime)
    local_cal_datetime = local_execution_datetime + datetime.timedelta(days=1)

    # Add to contexts
    context['local_cal_datetime'] = local_cal_datetime

    # Run normal Method
    return super().render_template(self, attr, content, context)


    class BashOperator(CustomTemplateVarsMixin, airflow.operators.bash_operator.BashOperator):
    pass


    class EmailOperator(CustomTemplateVarsMixin, airflow.operators.email_operator.EmailOperator):
    pass


    class PythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.PythonOperator):
    pass


    class BranchPythonOperator(CustomTemplateVarsMixin, airflow.operators.python_operator.BranchPythonOperator):
    pass

    关于airflow - Apache Airflow 1.10+ 调度程序是否支持在特定时间在不同的 DST 感知时区运行 2 个 DAG?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53783626/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com