gpt4 book ai didi

error-handling - 撤消/回滚数据处理管道的影响

转载 作者:行者123 更新时间:2023-12-04 04:32:45 28 4
gpt4 key购买 nike

我有一个工作流,我将描述如下:

[  Dump(query)  ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
|
[ Schema(query) ] ---+

地点:

  • query 是对 RDBMS 的查询
  • Dump 将结果 query 转储到 CSV 文件 dump
  • Schema 运行queryxcoms 其架构schema
  • Parquet 读取csv 并使用schema 创建Parquet 文件parquet
  • Hive 根据Parquet文件创建Hive表parquet

这个以某种方式令人费解的工作流程背后的原因是由于无法解决且超出问题范围的限制(但是,理想情况下它会比这简单得多)。

我的问题是关于在发生故障时回滚管道的影响

这些是我希望在不同条件下发生的回滚:

  • dump 应该始终被删除,无论管道的最终结果如何
  • parquet 如果由于某种原因 Hive 表创建失败,则应删除

在工作流程中表示这一点,我可能会这样写:

[  Dump(query)  ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
| | |
[ Schema(query) ] ---+ | |
v v
[ DeleteParquetOutput ] --> [ DeleteDumpOutput ]

只有在发生错误时才会执行从 ParquetDeleteParquetOutput 的转换,并且转换到 DeleteDumpOutput 时会忽略它的任何失败依赖关系。

这应该可以解决它,但我相信更复杂的管道可能会因这种错误处理逻辑而增加复杂性而受到很大影响

在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这可以被视为一种好的做法吗?有什么不同的(并且可能更可持续)方法?

如果您对我想如何解决这个问题更感兴趣,请继续阅读,否则请随时回答和/或发表评论。


我对管道中错误处理的看法

理想情况下,我想做的是:

  • 为每个相关阶段定义一个回滚过程
  • 对于每个回滚过程,定义它是只在失败的情况下发生还是在任何情况下发生
  • 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 并运行相关的回滚过程(如果适用)
  • 应记录回滚过程中的错误,但不应考虑以完成整个管道的回滚
  • 为了保持之前的观点,每个任务都应该定义一个单独的效果,其回滚过程可以在不引用其他任务的情况下进行描述

让我们用给定的管道做几个例子。

场景 1:成功

我们反转 DAG 并用它的强制回滚过程(如果有的话)填充每个任务,得到这个

                                         +---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: None ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here

场景二:Hive发生故障

                                                 +---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here

有什么方法可以在 Airflow 中表示这样的东西吗?我也乐于评估不同的工作流自动化解决方案,如果它们支持这种方法的话。

最佳答案

BaseOperator所有运算符和传感器都派生自的类支持回调:on_success_callback , on_retry_callbackon_failure_callback -- 也许这些会有所帮助。

关于error-handling - 撤消/回滚数据处理管道的影响,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49056230/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com