- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著.
从Spark 执行计划中获取数据血缘 。
因为数据处理任务会涉及到数据的转换和处理,所以从数据任务中解析血缘也是获取数据血缘的渠道之一,Spark 是大数据中数据处理最常用的一个技术组件,既可以做实时任务的处理,也可以做离线任务的处理。Spark在执行每一条SQL语句的时候,都会生成一个执行计划,这一点和很多数据库的做法很类似,都是SQL语句在执行时,先生成执行计划。如下图3-1-10所示,在Spark的官方文档链接https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html#content中,有明确提到,可以根据EXPLAIN关键字来获取执行计划,这和很多数据库查看执行计划的方式很类似.
图3-1-10 。
Spark底层生成执行计划以及处理执行计划的过程如下图3-1-11所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著.
。
图3-1-11 。
从图中可以看到, 。
1、 执行SQL语句或者Data Frame时,会先生成一个Unresolved Logical Plan,就是没有做过任何处理和分析的逻辑执行计划,仅仅会从SQL语法的角度做一些基础性的校验.
2、 之后通过获取Catalog的数据,对需要执行的SQL语句做表名、列名的进一步分析校验,从而生成一个可以直接运行的逻辑执行计划.
3、 但是Spark底层会有个优化器来生成一个最优的执行操作方式,从而生成一个优化后的最佳逻辑执行计划.
4、 将最终确定下来的逻辑执行计划转换为物理执行计划,转换为最终的代码进行执行.
Spark的执行计划其实就是数据处理的过程计划,会将SQL语句或者DataFrame 做解析,并且结合Catalog一起,生成最终数据转换和处理的代码。所以可以从Spark的执行计划中,获取到数据的转换逻辑,从而解析到数据的血缘。但是spark的执行计划都是在spark底层内部自动处理的,如何获取到每次Spark任务的执行计划的信息呢?其实在Spark底层有一套Listener的架构设计,可以通过Spark Listener 来获取到spark 底层很多执行的数据信息.
在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener 中提供了如下表3-1-2所示的两个方法.
表3-1-2 。
方法名 。 |
描述 。 |
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit 。 |
执行成功时,调用的方法,其中包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划 。 |
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit 。 |
执行失败时,调用的方法,其中同样也包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划 。 |
因此可以借用QueryExecutionListener 来主动让Spark在执行任务时,将执行计划信息推送到自己的系统或者数据库中,然后再做进一步的解析,如下图3-1-12所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著.
图3-1-12 。
import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener case class PlanExecutionListener(sparkSession: SparkSession) extends QueryExecutionListener with Logging{ override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) { // 执行成功时,调用解析执行计划的方法 planParser(qe) } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) { } private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = { try body catch { case NonFatal(e) => val ctx = qe.sparkSession.sparkContext logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e) } } def planParser(qe: QueryExecution): Unit = { logInfo("----------- start to get spark analyzed LogicPlan--------") //解析执行计划,并且将执行计划的数据发送到自有的系统或者数据库中 ...... } }
上面的代码中,实现了QueryExecutionListener 这个trait中的onSuccess和onFailure这两个方法,只有在onSuccess时,才需要获取执行计划的数据,因为只有onSuccess时的血缘才是有效的.
实现好了自定义的QueryExecutionListener后,可以通过sparkSession.listenerManager.register来将自己实现的PlanExecutionListener 注册到Spark会话中,listenerManager是Spark中Listener的管理器.
在获取到执行计划时,需要再结合Catalog一起,来进一步解析血缘的数据,如下图3-1-13所示 。
图3-1-13 。
Spark 中常见的执行计划实现类如下表3-1-3所示,获取数据血缘时,就是需要从如下的这些执行计划中解析血缘关系。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著.
表3-1-3 。
执行计划实现类 。 |
描述 。 |
org.apache.spark.sql.execution.datasources.LogicalRelation 。 |
一般用于解析字段级的关联关系 。 |
org.apache.spark.sql.catalyst.catalog.HiveTableRelation 。 |
Hive 表关联关系的执行计划,一般用于SQL执行时,存在关联查询的情况会出现该执行计划. |
org.apache.spark.sql.hive.execution.InsertIntoHiveTable 。 |
一般是在执行insert into 的SQL 语句时才会产生的执行计划,例如insert into xxx_table(colum1,column2) values("4","zhangsan") 。 |
org.apache.spark.sql.execution.datasources 。 .InsertIntoHadoopFsRelationCommand 。 |
一般用于执行类似 sparkSession 。 .read 。 .table("xx_source_table ") 。 .limit(10) 。 .write 。 .mode(SaveMode.Append) 。 .insertInto("xx_target_table ")产生的执行计划. |
org.apache.spark.sql.hive.execution. 。 CreateHiveTableAsSelectCommand 。 |
一般是在执行create table xxx_table as的SQL 语句时才会产生的执行计划,例如create table xx_target_table as select * from xx_source_table 。 |
org.apache.spark.sql.execution.command 。 .CreateDataSourceTableAsSelectCommand 。 |
一般用于执行类似sparkSession 。 .read 。 .table("xx_source_table") 。 .limit(10) 。 .write 。 .mode(SaveMode.Append) 。 .saveAsTable("xx_target_table")产生的执行计划. |
org.apache.spark.sql.execution.datasources 。 .InsertIntoDataSourceCommand 。 |
一般用于将SQL查询结果写入到一张表中,比如insert into xxx_target_table select * from xxx_source_table 。 |
。
如下是以org.apache.spark.sql.execution.datasources 。
.InsertIntoHadoopFsRelationCommand 为例的spark 执行计划的数据,如下数据已经将原始的执行计划转换为了json格式的数据,方便做展示.
.................更多内容,请参考清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著 。
。
。
最后此篇关于图书《数据资产管理核心技术与应用》核心章节节选-3.1.2.从Spark执行计划中获取数据血缘的文章就讲到这里了,如果你想了解更多关于图书《数据资产管理核心技术与应用》核心章节节选-3.1.2.从Spark执行计划中获取数据血缘的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!