- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
HBase-Spark连接器(在HBase-Spark模块中)利用Spark-1.2.0中引入的DataSource API (SPARK-3247),弥补了简单HBase KV存储和复杂关系SQL查询之间的差距,使用户能够使用Spark在HBase上执行复杂的数据分析工作。HBase Dataframe是标准的Spark Dataframe,能够与任何其他数据源(如Hive,Orc,Parquet,JSON等)进行交互。HBase-Spark Connector应用关键技术,如分区修剪,列修剪,谓词叠加和数据局部性。
要使用HBase-Spark连接器,用户需要为HBase和Spark表之间的模式映射定义Catalog,准备数据并填充HBase表,然后加载HBase DataFrame。之后,用户可以使用SQL查询在HBase表中进行集成查询和访问记录。以下说明了基本程序。
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
Catalog定义了HBase和Spark表之间的映射。该目录有两个关键部分。一个是rowkey定义,另一个是Spark中的表列与HBase中的列族和列限定符之间的映射。上面定义了一个HBase表的模式,其名称为table1,行键为key,列数为col1 -col8。请注意,还必须将rowkey详细定义为column (col0),该列具有特定的cf(rowkey)。
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
用户准备的data是一个本地Scala集合,它有256个HBaseRecord对象。 sc.parallelize(data)函数分配data以形成RDD。toDF返回一个DataFrame。 writefunction返回一个DataFrameWriter,它用于将DataFrame写入外部存储系统(例如,HBase)。给定具有指定模式的DataFrame catalog,save函数将创建一个包含5个区域的HBase表,并将DataFrame保存在其中。
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)
在’withCatalog’函数中,sqlContext是SQLContext的变量,它是在Spark中处理结构化数据(行和列)的入口点。 read返回一个DataFrameReader,可用于以DataFrame的形式读取数据。 option函数将基础数据源的输入选项添加到DataFrameReader,format函数指定DataFrameReader的输入数据源格式。该load()函数将输入作为DataFrame加载。withCatalog函数返回的日期框df可用于访问HBase表。
val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" <= "row005")
.select("col0", "col1", "col4")
s.show
DataFrame可以执行各种操作,例如join,sort,select,filter,orderBy等。上面的df.filter使用给定的SQL表达式过滤行。select选择一组列: col0,col1和col4。
df.registerTempTable("table1")
sqlContext.sql("select count(col1) from table1").show
registerTempTabledf使用表名将DataFrame注册为临时表table1。此临时表的生命周期与用于创建df的SQLContext相关联。sqlContext.sql函数允许用户执行SQL查询。
示例-使用不同时间戳的查询
在HBaseSparkConf中,可以设置与时间戳相关的四个参数。它们分别是TIMESTAMP,MIN_TIMESTAMP,MAX_TIMESTAMP和MAX_VERSIONS。用户可以使用MIN_TIMESTAMP和MAX_TIMESTAMP查询具有不同时间戳或时间范围的记录。与此同时,在下面的示例中使用具体值而不是tsSpecified和oldMs。
下面的示例显示了如何使用不同的时间戳加载df DataFrame。tsSpecified由用户指定。HBaseTableCatalog定义HBase和Relation关系模式。writeCatalog定义模式映射的目录。
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
下面的示例显示了如何加载具有不同时间范围的df DataFrame。oldMs由用户指定。
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
加载df DataFrame后,用户可以查询数据。
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show
示例-本地Avro支持
HBase-Spark Connector支持不同的数据格式,如Avro,Jason等。下面的用例显示了spark是如何支持Avro的。用户可以直接将Avro记录保存到HBase中。在内部,Avro模式自动转换为本机Spark Catalyst数据类型。请注意,HBase表中的两个键值部分都可以用Avro格式定义。
1)定义模式映射的目录:
def catalog = s"""{
|"table":{"namespace":"default", "name":"Avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
catalog是名为Avrotable的HBase表的模式。行键作为键和一列col1。还必须将rowkey详细定义为column (col0),该列具有特定的cf(rowkey)。
2)准备数据:
object AvroHBaseRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]},
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
| ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name${"%03d".format(i)}")
user.put("favorite_number", i)
user.put("favorite_color", s"color${"%03d".format(i)}")
val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
favoriteArray.add(s"number${i}")
favoriteArray.add(s"number${i+1}")
user.put("favorite_array", favoriteArray)
import collection.JavaConverters._
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
user.put("favorite_map", favoriteMap)
val avroByte = AvroSedes.serialize(user, avroSchema)
AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
}
}
val data = (0 to 255).map { i =>
AvroHBaseRecord(i)
}
首先定义schemaString,然后解析得到avroSchema。avroSchema用于生成AvroHBaseRecord。用户准备的data是一个包含256个AvroHBaseRecord对象的本地Scala集合。
3)保存DataFrame:
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
给定具有指定模式的数据框catalog,上面将创建一个包含5个区域的HBase表,并将数据框保存在其中。
4)加载DataFrame
def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
val df = withCatalog(catalog)
在withCatalog函数中,read返回一个DataFrameReader,可用于以DataFrame的形式读取数据。该option函数将基础数据源的输入选项添加到DataFrameReader。有两个选项:一个是设置avroSchema为AvroHBaseRecord.schemaString,一个是设置HBaseTableCatalog.tableCatalog为avroCatalog。该load()函数将输入作为DataFrame加载。withCatalog函数返回的日期框df可用于访问HBase表。
5)SQL查询:
df.registerTempTable("avrotable")
val c = sqlContext.sql("select count(1) from avrotable").
加载df DataFrame后,用户可以查询数据。registerTempTable使用表名avrotable将df DataFrame注册为临时表。sqlContext.sql函数允许用户执行SQL查询。
假设我有 3 个 DataFrame。其中一个 DataFrame 的列名不在其他两个中。 using DataFrames df1 = DataFrame([['a', 'b', 'c'], [1,
假设我有 3 个 DataFrame。其中一个 DataFrame 的列名不在其他两个中。 using DataFrames df1 = DataFrame([['a', 'b', 'c'], [1,
我有一个 largeDataFrame(多列和数十亿行)和一个 smallDataFrame(单列和 10,000 行)。 只要 largeDataFrame 中的 some_identifier 列
我有一个函数,可以在其中规范化 DataFrame 的前 N 列。我想返回规范化的 DataFrame,但不要管原来的。然而,该函数似乎也会对传递的 DataFrame 进行变异! using D
我想在 Scala 中使用指定架构在 DataFrame 上创建。我尝试过使用 JSON 读取(我的意思是读取空文件),但我认为这不是最佳实践。 最佳答案 假设您想要一个具有以下架构的数据框: roo
我正在尝试从数据框中删除一些列,并且不希望返回修改后的数据框并将其重新分配给旧数据框。相反,我希望该函数只修改数据框。这是我尝试过的,但它似乎并没有做我所除外的事情。我的印象是参数是作为引用传递的,而
我有一个包含大约 60000 个数据的庞大数据集。我会首先使用一些标准对整个数据集进行分组,接下来我要做的是将整个数据集分成标准内的许多小数据集,并自动对每个小数据集运行一个函数以获取参数对于每个小数
我遇到了以下问题,并有一个想法来解决它,但没有成功:我有一个月内每个交易日的 DAX 看涨期权和看跌期权数据。经过转换和一些计算后,我有以下 DataFrame: DaxOpt 。现在的目标是消除没有
我正在尝试做一些我认为应该是单行的事情,但我正在努力把它做好。 我有一个大数据框,我们称之为lg,还有一个小数据框,我们称之为sm。每个数据帧都有一个 start 和一个 end 列,以及多个其他列所
我有一个像这样的系列数据帧的数据帧: state1 state2 state3 ... sym1 sym
我有一个大约有 9k 行和 57 列的数据框,这是“df”。 我需要一个新的数据框:'df_final'- 对于“df”的每一行,我必须将每一行复制“x”次,并将每一行中的日期逐一增加,也就是“x”次
假设有一个 csv 文件如下: # data.csv 0,1,2,3,4 a,3.0,3.0,3.0,3.0,3.0 b,3.0,3.0,3.0,3.0,3.0 c,3.0,3.0,3.0,3.0,3
我只想知道是否有人对以下问题有更优雅的解决方案: 我有两个 Pandas DataFrame: import pandas as pd df1 = pd.DataFrame([[1, 2, 3], [
我有一个 pyspark 数据框,我需要将其转换为 python 字典。 下面的代码是可重现的: from pyspark.sql import Row rdd = sc.parallelize([R
我有一个 DataFrame,我想在 @chain 的帮助下对其进行处理。如何存储中间结果? using DataFrames, Chain df = DataFrame(a = [1,1,2,2,2
我有一个包含 3 列的 DataFrame,名为 :x :y 和 :z,它们是 Float64 类型。 :x 和 "y 在 (0,1) 上是 iid uniform 并且 z 是 x 和 y 的总和。
这个问题在这里已经有了答案: pyspark dataframe filter or include based on list (3 个答案) 关闭 2 年前。 只是想知道是否有任何有效的方法来过
我刚找到这个包FreqTables ,它允许人们轻松地从 DataFrames 构建频率表(我正在使用 DataFrames.jl)。 以下代码行返回一个频率表: df = CSV.read("exa
是否有一种快速的方法可以为 sort 指定自定义订单?/sort!在 Julia DataFrames 上? julia> using DataFrames julia> srand(1); juli
在 Python Pandas 和 R 中,可以轻松去除重复的列 - 只需加载数据、分配列名,然后选择那些不重复的列。 使用 Julia Dataframes 处理此类数据的最佳实践是什么?此处不允许
我是一名优秀的程序员,十分优秀!