gpt4 book ai didi

triggers - airflow TriggerDagRunOperator 如何更改执行日期

转载 作者:行者123 更新时间:2023-12-02 09:10:09 52 4
gpt4 key购买 nike

我注意到,对于计划任务,执行日期是根据

设置在过去的

Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.

但是,当一个 dag 触发另一个 dag 时,执行时间将设置为 now()。

有没有办法让触发的 dags 与触发 dag 的执行时间相同?当然,我可以重写模板并使用昨天_ds,但是,这是一个棘手的解决方案。

最佳答案

以下类对 TriggerDagRunOperator 进行了扩展,允许将执行日期作为字符串传递,然后将其转换回日期时间。这有点hacky,但这是我发现完成工作的唯一方法。

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
"""
MMT-patched for passing explicit execution date
(otherwise it's hard to hook the datetime.now() date).
Use when you want to explicity set the execution date on the target DAG
from the controller DAG.

Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

Parameters
------------------
execution_date: str
the custom execution date (jinja'd)

Usage Example:
-------------------
my_dag_trigger_operator = MMTTriggerDagRunOperator(
execution_date="{{execution_date}}"
task_id='my_dag_trigger_operator',
trigger_dag_id='my_target_dag_id',
python_callable=lambda: random.getrandbits(1),
params={},
dag=my_controller_dag
)
"""
template_fields = ('execution_date',)

def __init__(
self, trigger_dag_id, python_callable, execution_date,
*args, **kwargs
):
self.execution_date = execution_date
super(MMTTriggerDagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id, python_callable=python_callable,
*args, **kwargs
)

def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
external_trigger=True)
logging.info("Creating DagRun {}".format(dr))
session.add(dr)
session.commit()
session.close()
else:
logging.info("Criteria not met, moving on")

使用此方法且未设置 execution_date=now() 时可能会遇到一个问题:如果您尝试使用相同的 启动 dag,您的运算符(operator)将抛出 mysql 错误execution_date 两次。这是因为 execution_datedag_id 用于创建行索引,无法插入具有相同索引的行。

无论如何,我想不出您想要在生产中运行两个具有相同 execution_date 的相同 dags 的原因,但这是我在测试时遇到的情况,您不应该感到 panic 通过它。只需清除旧作业或使用不同的日期时间即可。

关于triggers - airflow TriggerDagRunOperator 如何更改执行日期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47808520/

52 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com