- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
最近我对 Airflow 进行了太多测试,以至于 execution_date
有一个问题运行时 airflow trigger_dag <my-dag>
.
我了解到 execution_date
不是我们第一次从here想到的:
Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.
start_date = datetime.combine(datetime.today(),
datetime.min.time())
args = {
"owner": "xigua",
"start_date": start_date
}
dag = DAG(dag_id="hadoopprojects", default_args=args,
schedule_interval=timedelta(days=1))
wait_5m = ops.TimeDeltaSensor(task_id="wait_5m",
dag=dag,
delta=timedelta(minutes=5))
以上代码是我日常工作流程的开始部分,第一个任务是一个 TimeDeltaSensor,它在实际工作之前再等待 5 分钟,所以这意味着我的 dag 将在 2016-09-09T00:05:00
被触发。 , 2016-09-10T00:05:00
……等
在 Web UI 中,我可以看到类似 scheduled__2016-09-20T00:00:00
的内容, 任务在 2016-09-21T00:00:00
处运行,根据 ETL
这似乎是合理的模型。
但是有一天我的 dag 由于未知原因没有被触发,所以我手动触发它,如果我在 2016-09-20T00:10:00
触发它,那么 TimeDeltaSensor 将等到 2016-09-21T00:15:00
运行前。
这不是我想要的,我希望它在 2016-09-20T00:15:00
运行不是第二天,我试过了execution_date
通过--conf '{"execution_date": "2016-09-20"}'
, 但它不起作用。
我该如何处理这个问题?
$ airflow version
[2016-09-21 17:26:33,654] {__init__.py:36} INFO - Using executor LocalExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.7.1.3
最佳答案
首先,我建议您对 start_date
使用常量,因为根据调度程序评估的 Airflow 管道,动态的行为会不可预测。
关于 start_date
的更多信息在我写的 FAQ 条目中,将所有这些整理出来: https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
现在,关于 execution_date
当它被触发时,这是使用 Airflow 的人的常见问题。 Airflow 套execution_date
基于它所覆盖的计划周期的左边界,而不是基于它触发的时间(这将是该周期的右边界)。运行 schedule='@hourly'
时task 例如,一个任务将每小时触发一次。下午 2 点触发的任务将有一个 execution_date
。下午 1 点,因为它假定您在下午 2 点处理下午 1 点到下午 2 点的时间窗口。同样,如果您运行日常作业,则运行 execution_date
的 2016-01-01
将在 2016-01-02
午夜后不久触发.
在考虑 ETL 和差异负载时,这种左边界标记很有意义,但在考虑简单的、类似 cron 的调度程序时会变得困惑。
关于python - airflow trigger_dag execution_date 是第二天,为什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39612488/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!