- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章SparkSQL简介及运行原理由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
。
。
Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。 Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式.
。
将Spark SQL转化为RDD,然后提交到集群执行.
。
(1)容易整合,Spark SQL已经集成在Spark中 。
(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问 。
(3)兼容 Hive 。
(4)标准的数据连接:JDBC、ODBC 。
。
。
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格.
DataFrame是组织成命名列的数据集.
它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化.
关系型数据库中的表由表结构和数据组成,而DataFrame也类似,由schema(结构)和数据组成,其数据集是RDD.
DataFrame可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD 。
。
。
上图展示了Spark的模块及各模块之间的关系:
底层是Spark-core核心模块,Spark每个模块都有一个核心抽象,Spark-core的核心抽象是RDD, 。
Spark SQL等都基于RDD封装了自己的抽象,在Spark SQL中是DataFrame/DataSet.
相对来说RDD是更偏底层的抽象,DataFrame/DataSet是在其上做了一层封装,做了优化,使用起来更加方便.
从功能上来说,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做.
。
DataFrame与RDD的主要区别在于:
DataFrame 。
DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型.
使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标.
RDD 。
RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化.
DataFrame和RDD联系:
DataFrame底层是以RDD为基础的分布式数据集,和RDD的主要区别的是:RDD中没有schema信息,而DataFrame中数据每一行都包含schema 。
DataFrame = RDD[Row] + shcema 。
。
。
SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能.
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD.
对于每个其他的API,我们需要使用不同的context.
例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext.
但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点.
SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来.
。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的.
SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的.
。
----为用户提供一个统一的切入点使用Spark各项功能 。
----允许用户通过它调用DataFrame和Dataset相关 API来编写程序 。
----减少了用户需要了解的一些概念,可以很容易的与Spark进行交互 。
----与Spark交互之时不需要显示的创建SparkConf, SparkContext以及 SQlContext,这些对象已经封闭在SparkSession中 。
。
。
case class People(val name:String,val age:Int) //可以声明数据类型object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //设置运行模式为本地运行,不然默认是集群模式 //conf.setMaster("local") //默认是集群模式 //设置任务名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //设置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD People(line.split(",")(0),line.split(",")(1).trim().toInt) }) import Sqlsc.implicits._ //引入全部方法 //将RDD转换成DataFrame val df = peopleRDD.toDF() //将DataFrame转换成一个临时的视图 df.createOrReplaceTempView("people") //使用SQL语句进行查询 Sqlsc.sql("select * from people").show() }}
。
object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //设置运行模式为本地运行,不然默认是集群模式 //conf.setMaster("local") //默认是集群模式 //设置任务名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //设置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") //1.需要将RDD数据映射成Row,需要引入import org.apache.spark.sql.Row val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD val fields = line.split(",") Row(fields(0),fields(1).trim().toInt) }) //2.创建StructType定义结构 val st:StructType = StructType( //字段名,字段类型,是否可以为空 List( //传参是列表类型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil来构建列表 StructField("name",StringType,true), StructField("age",IntegerType,true) ) ) //3.使用SparkSession建立DataFrame val df = Sqlsc.createDataFrame(peopleRDD,st) //将DataFrame转换成一个临时的视图 df.createOrReplaceTempView("people") //使用SQL语句进行查询 Sqlsc.sql("select * from people").show() }}
。
[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={ val conf = new SparkConf() //设置运行模式为本地运行,不然默认是集群模式 //conf.setMaster("local") //默认是集群模式 //设置任务名 conf.setAppName("WordCount").setMaster("local") //设置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根据SparkContext生成SQLContext //通过json数据直接创建DataFrame val df = Sqlsc.read.json("E:1.json") //将DataFrame转换成一个临时的视图 df.createOrReplaceTempView("people1") //使用SQL语句进行查询 Sqlsc.sql("select * from people1").show() }
。
。
视图是一个虚表,跟Mysql里的概念是一样的,视图基于实际的表而存在,其实质是一系列的查询语句 。
。
局部视图(Temoporary View):只在当前会话中有效,如果创建它的会话终止,则视图也会消失.
全局视图(Global Temporary View): 在全局范围内有效,不同的Session中都可以访问,生命周期是Spark的Application运行周期,全局视图会绑定到系统保留的数据库global_temp中,因此使用它的时候必须加上相应前缀.
。
创建局部视图:df.createOrReplaceTempView("emp") 创建全局视图:df.createOrReplaceGlobalTempView("empG") 。
。
spark.sql("select * from emp").show spark.sql("select * from global_temp.empG").show //查询全局视图,需要添加前缀 。
。
spark.newSession.sql("select * from emp").show -----> 报错,Table or View Not Found spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查询 。
。
。
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:666people.json") val df2 = sqlContext.read.parquet("E:666users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:666people.json") val df4 = sqlContext.read.format("parquet").load("E:666users.parquet") //方式三,默认是parquet格式 val df5 = sqlContext.load("E:666users.parquet") //方式四,使用MySQL进行数据源读取 val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show()
使用Hive作为数据源:需要在pom.xml文件中添加依赖 。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下 。
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration>hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show()
。
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:666people.json") //方式一 df1.write.json("E:111") df1.write.parquet("E:222") //方式二 df1.write.format("json").save("E:333") df1.write.format("parquet").save("E:444") //方式三 df1.write.save("E:555")
。
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:444")
。
Dataset也是一个分布式数据容器,简单来说是类似二维表,Dataset里头存有schema数据结构信息和原生数据,Dataset的底层封装的是RDD,当RDD的泛型是Row类型的时候,我们也可以称它为DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset.
Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。根据官方的文档:
Dataset是一种强类型集合,与领域对象相关,可以使用函数或者关系进行分布式的操作。 每个Dataset也有一个无类型的视图,叫做DataFrame,也就是关于Row的Dataset。 简单来说,Dataset一般都是Dataset[T]形式,这里的T是指数据的类型,如上图中的Person,而DataFrame就是一个Dataset[Row].
Datasets是懒加载的,即只有actions被调用的时候才会触发计算。在内部,Dataset代表一个逻辑计划,用来描述产生数据需要的计算。当一个action被调用的时候,Spark的query优化器会优化这个逻辑计划并以分布式的方式在物理上进行实际的计算操作.
。
(1,"Tom") (2,"Mary")
测试数据 。
(1)定义case class case class MyData(a:Int,b:String) (2)使用序列创建DataSet val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS 。
。
(1)定义case class case class Person(name:String,age:BigInt) (2)读入JSON的数据 val df = spark.read.json("/root/temp/people.json") (3)将DataFrame转换成DataSet val PersonDS =df.as[Person] 。
。
(1)读取HDFS的文件,直接创建DataSet val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String] (2)分词操作,查询长度大于3的单词 val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3) words.show words.collect 。
到此这篇关于SparkSQL简介及运行原理的文章就介绍到这了,更多相关SparkSQl使用内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://www.cnblogs.com/ssyfj/p/12616453.html 。
最后此篇关于SparkSQL简介及运行原理的文章就讲到这里了,如果你想了解更多关于SparkSQL简介及运行原理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
Hive 外部表指向 S3 上的文件,ddl 包括按 eod 子句分区。一个文件夹下有 5 个子文件夹,每个子文件夹下面都有一个文件,用于不同的 partition_date。即 eod=201806
SparkSQL / DataFrames HBase-Spark连接器(在HBase-Spark模块中)利用Spark-1.2.0中引入的DataSource API (SPARK-3247),弥
我将 RDD[myClass] 转换为数据帧,然后将其注册为 SQL表 my_rdd.toDF().registerTempTable("my_rdd") 该表是可调用的,可以使用以下命令进行演示 %
我在这看到 DataBricks post ,SparkSql 中支持窗口函数,特别是我正在尝试使用 lag() 窗口函数。 我有几行信用卡交易,我已经对它们进行了排序,现在我想遍历这些行,并为每一行
我正在为 hive 使用远程 mysql 元存储。当我运行 hive 客户端时,它运行完美。但是当我尝试通过 spark-shell 或 spark-submit 使用 spark-sql 时,我无法
我有一个 Spark 作业,它正在将数据从 CSV 文件加载到 MySQL 数据库中。 一切正常,但最近我注意到 Spark 在插入阶段打开了许多连接(300 多个连接)。感觉就像每个插入语句都打开一
这段代码来自 Spark Programming Guide , # The result of loading a parquet file is also a DataFrame. parquet
我有一个 Scala spark DataFrame: df.select($"row_id", $"array_of_data").show +----------+----------------
我设计了以下函数来处理任何数字类型的数组: def array_sum[T](item:Traversable[T])(implicit n:Numeric[T]) = item.sum // Reg
我通过 df.saveAsTable 创建了一个持久表 当我运行以下查询时,我会收到这些结果 spark.sql("""SELECT * FROM mytable """).show() 我可以查看
我想通过 sparksql 删除一个配置单元表。 在安装了 hadoop 2.6、hive 2.0、spark 1.6 和 spark 2.0 的集群中。我在两个版本的 pyspark shell 和
我有一个要求,我需要计算 SparkSQL 中 Hive 表的重复行数。 from pyspark import SparkContext, SparkConf from pyspark.sql im
我有一个连接到 Postgres 数据库的 SparkSQL 的非常简单的设置,我正在尝试从一个表中获取一个 DataFrame,该 DataFrame 具有 X 个分区(假设为 2)。代码如下: M
有什么方法可以在 sparksql 中实现存储过程或函数等 sql 功能? 我知道 hbase 中的 hpl sql 和协处理器。但是想知道 spark 中是否有类似的东西。 最佳答案 您可以考虑使用
我正在使用 cloudera vm 10.0,spark 版本为 1.6。 登录 pyspark 控制台后,我正在尝试以下语句从配置单元中获取数据 sqlContext.sql("select * f
我想用 Spark SQL 2.0 执行以下查询 SELECT a.id as id, (SELECT SUM(b.points) FROM tableB b WHERE b.id = a.i
我正在使用 spark sql 对我的数据集运行查询。查询的结果很小,但仍然是分区的。 我想合并生成的 DataFrame 并按列对行进行排序。我试过 DataFrame result = spark
任何人都直接在 HBase 表上使用 SparkSQL,就像在 Hive 表上使用 SparkSQL。 我是spark新手。请指导我如何连接hbase和spark。如何查询hbase表。 最佳答案 A
我正在尝试从 SparkSQL 表(S3 中的 Parquet )中有效地选择单个分区。但是,我看到 Spark 打开表中所有 Parquet 文件的证据,而不仅仅是那些通过过滤器的文件。对于具有大量
我尝试使用 SparkSQL (v.1.3.0) 访问 PostgreSQL 数据库。在这个数据库中,我有一个表 CREATE TABLE test ( id bigint, values dou
我是一名优秀的程序员,十分优秀!