- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
@ 。
在同一分区中创建新的文件组集,现有的文件组被标记为 “删除”,根据新记录的数量创建新的文件组.
COW流程如下 。
MOR流程如下 。
用来生成 HoodieKey(record key + partition path),目前支持以下策略:
通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:
Hudi使用Spark-2.4.3+和Spark 3。x版本。Hudi支持的Spark版本如下
解压spark-3.3.0-bin-hadoop3.tgz,配置Spark环境变量 。
vim /etc/profile
export SPARK_HOME=/home/commons/spark-3.3.0-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATH
source /etc/profile
然后将前面编译的hudi-spark3.3-bundle_2.12-0.12.1.jar(在hudi的根目录下packaging/hudi-spark-bundle/target/,至于如何编译请看前面的内容)拷贝到Spark根目录下Jars目录.
不同版本(Spark 3.3、Spark 3.2、Spark 3.1、Spark 2.4)的spark-shell启动命令有所不同,下面以Spark 3.3来操作演示.
spark-shell \
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
接下来设置表名、基本路径和数据生成器 。
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
DataGenerator可以根据旅行应用生成相应的样例数据插入和更新;spark中不需要单独的create table命令如果表不存在,第一批写入操作将创建该表.
接下来通过DataGenerator生成一些新的行程数据,将它们加载到DataFrame中,并将DataFrame写入Hudi表中.
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
mode为Overwrite如果表存在则覆盖重新创建表。可以从basePath = "file:///tmp/hudi_trips_cow" 配置的本地文件目录查看hoodie的元数据和数据的变化.
还可以通过外部化配置文件,可以在配置文件Hudi -default.conf中集中设置配置,而不是直接将配置设置传递给每个Hudi作业.
先转成spark的df,然后再执行spark sql的查询 。
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
每个hoodie表固定加了如下的五个字段,hoodie提交时间、hoodie提交序号、hoodie记录键、hoodie分区路径、hoodie文件名.
类似于插入新数据,同样使用数据生成器生成新的行程的数据,加载到DataFrame中,并将DataFrame写入hudi表.
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
注意,现在保存模式是追加。通常,总是使用追加模式,除非您试图第一次创建表。再次查询数据将显示更新的行程。每个写操作都会生成一个由时间戳表示的新提交。在之前的提交中寻找相同的_hoodie_record_keys的_hoodie_commit_time、rider、driver字段的变化.
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare, begin_lon, begin_lat, ts,rider, driver from hudi_trips_snapshot").show()
查询更新后的数据,已经有部分未更新后的数据,提交时间也有其他的值,记录数还是10条.
查看hoodie目录下已经多个一个版本文件 。
从0.9.0开始支持时间旅行查询。目前支持三种查询时间格式,如下所示 。
val tripsSnapshotDF = spark.read.
format("hudi").
option("as.of.instant", "20221122143158632").
load(basePath)
spark.read.
format("hudi").
option("as.of.instant", "2022-11-22 14:31:58.632").
load(basePath)
// 等价于"as.of.instant = 2022-11-22 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2022-11-22").
load(basePath)
使用第一种示例如下
val tripsSnapshotDF1 = spark.read.
format("hudi").
option("as.of.instant", "20221121184124298").
load(basePath)
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot1")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare, begin_lon, begin_lat, ts,rider, driver from hudi_trips_snapshot1").show()
Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供需要流化更改的开始时间。如果希望在给定的提交之后进行所有更改(通常是这样),则不需要指定endTime。这将给出在beginTime提交后发生的所有更改,过滤器为fare > 20.0。该特性的独特之处在于,它现在允许您在批处理数据上编写流管道。利用增量管道可以在批处理数据上创建增量管道.
先将上面的更新数据多执行几次,产生多个版本的数据 。
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
时间可以通过将endTime指向特定的提交时间,将beginTime指向“000”(表示尽可能早的提交时间)来表示.
val beginTime = "000"
val endTime = commits(commits.length - 2)
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
Apache Hudi支持两种类型的删除
先查询当前记录数 。
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
执行软删除后查看记录数,有两条被置为空.
val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
val nullifyColumns = softDeleteDs.schema.fields.
map(field => (field.name, field.dataType.typeName)).
filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
&& !Array("ts", "uuid", "partitionpath").contains(pair._1)))
val softDeleteDf = nullifyColumns.
foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
(ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
softDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY, "upsert").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()
删除传进来的hoodiekey记录,删除操作只支持“追加”模式.
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY, "delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
生成一些新的行程数据,覆盖输入中出现的所有分区。对于批处理ETL作业,此操作比upsert快,批处理ETL作业一次重新计算整个目标分区(与增量更新目标表相反)。这是由于能够完全绕过索引、预合并和upsert写路径中的其他重分区步骤.
先查看当前的key数据 。
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
执行覆盖数据操作(类似hive的insert overwrite的功能)后查看key的数据.
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)
Hudi支持使用Spark SQL与HoodieSparkSessionExtension SQL扩展写和读数据。在解压的目录下运行Spark SQL和Hudi
nohup hive --service metastore &
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
Spark SQL需要一个显式的create table命令.
接下来通过实际sql演示如何创建不同的表.
创建一个非分区表 。
create database hudi_spark;
use hudi_spark;
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
创建外部COW分区表 。
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
create table hudi_existing_tbl using hudi
location '/tmp/hudi/hudi_cow_pt_tbl';
CTAS,Hudi 支持在Spark SQL使用CTAS (Create Table As Select) 。
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;
创建表属性可以在创建hudi表时设置表属性,关键选项如下:
-- 插入非分区表
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
-- 插入动态分区
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
-- 插入静态分区
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
-- precombinefield提供的表的upsert模式
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
-- bulk_insert模式用于precombinefield提供的表
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
insert into hudi_cow_pt_tbl select 3, 'c0', 1000, '2022-11-23', '10';
select * from hudi_cow_pt_tbl;
-- 记录id=3 修改 `name`
insert into hudi_cow_pt_tbl select 3, 'c1', 1001, '2022-11-23', '10';
select * from hudi_cow_pt_tbl;
-- 基于第一次提交时间的时间旅行,假设 `20220307091628793`
select * from hudi_cow_pt_tbl timestamp as of '20221123153135498' where id = 3;
-- 基于不同时间戳格式的时间旅行
select * from hudi_cow_pt_tbl timestamp as of '2022-11-23 15:31:35.498' where id = 3;
select * from hudi_cow_pt_tbl timestamp as of '2022-11-23' where id = 3;
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
update hudi_cow_pt_tbl set ts = 1005 where name = 'a1_1';
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
select * from hudi_mor_tbl ;
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2022-11-23', '10'), (2, "new_a2", 'delete', '2022-11-23', '11'), (3, "new_a3", 'insert', '2022-11-23', '12');
merge into hudi_cow_pt_tbl as target
using (
select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
delete from hudi_cow_pt_tbl where name = 'a1';
insert覆盖分区表使用INSERT_OVERWRITE_TABLE类型的写操作,而非分区表使用INSERT_OVERWRITE_TABLE类型的写操作.
-- 插入覆盖非分区表
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
-- 用动态分区插入覆盖分区表
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';
-- 用静态分区插入覆盖分区表
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;
-- 改表名
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
-- 添加列
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
-- 修改列
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
-- 设置表属性
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
-- 显示分区
show partitions hudi_cow_pt_tbl;
-- 删除分区
alter table hudi_cow_pt_tbl drop partition (dt='2022-11-23', hh='10');
本人博客网站 IT小神 www.itxiaoshen.com 。
最后此篇关于大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-中的文章就讲到这里了,如果你想了解更多关于大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-中的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
初学者 android 问题。好的,我已经成功写入文件。例如。 //获取文件名 String filename = getResources().getString(R.string.filename
我已经将相同的图像保存到/data/data/mypackage/img/中,现在我想显示这个全屏,我曾尝试使用 ACTION_VIEW 来显示 android 标准程序,但它不是从/data/dat
我正在使用Xcode 9,Swift 4。 我正在尝试使用以下代码从URL在ImageView中显示图像: func getImageFromUrl(sourceUrl: String) -> UII
我的 Ubuntu 安装 genymotion 有问题。主要是我无法调试我的数据库,因为通过 eclipse 中的 DBMS 和 shell 中的 adb 我无法查看/data/文件夹的内容。没有显示
我正在尝试用 PHP 发布一些 JSON 数据。但是出了点问题。 这是我的 html -- {% for x in sets %}
我观察到两种方法的结果不同。为什么是这样?我知道 lm 上发生了什么,但无法弄清楚 tslm 上发生了什么。 > library(forecast) > set.seed(2) > tts lm(t
我不确定为什么会这样!我有一个由 spring data elasticsearch 和 spring data jpa 使用的类,但是当我尝试运行我的应用程序时出现错误。 Error creatin
在 this vega 图表,如果我下载并转换 flare-dependencies.json使用以下 jq 到 csv命令, jq -r '(map(keys) | add | unique) as
我正在提交一个项目,我必须在其中创建一个带有表的 mysql 数据库。一切都在我这边进行,所以我只想检查如何将我所有的压缩文件发送给使用不同计算机的人。基本上,我如何为另一台计算机创建我的数据库文件,
我有一个应用程序可以将文本文件写入内部存储。我想仔细看看我的电脑。 我运行了 Toast.makeText 来显示路径,它说:/数据/数据/我的包 但是当我转到 Android Studio 的 An
我喜欢使用 Genymotion 模拟器以如此出色的速度加载 Android。它有非常好的速度,但仍然有一些不稳定的性能。 如何从 Eclipse 中的文件资源管理器访问 Genymotion 模拟器
我需要更改 Silverlight 中文本框的格式。数据通过 MVVM 绑定(bind)。 例如,有一个 int 属性,我将 1 添加到 setter 中的值并调用 OnPropertyChanged
我想向 Youtube Data API 提出请求,但我不需要访问任何用户信息。我只想浏览公共(public)视频并根据搜索词显示视频。 我可以在未经授权的情况下这样做吗? 最佳答案 YouTube
我已经设置了一个 Twilio 应用程序,我想向人们发送更新,但我不想回复单个文本。我只是想让他们在有问题时打电话。我一切正常,但我想在发送文本时显示传入文本,以确保我不会错过任何问题。我正在使用 p
我有一个带有表单的网站(目前它是纯 HTML,但我们正在切换到 JQuery)。流程是这样的: 接受用户的输入 --- 5 个整数 通过 REST 调用网络服务 在服务器端运行一些计算...并生成一个
假设我们有一个名为 configuration.js 的文件,当我们查看内部时,我们会看到: 'use strict'; var profile = { "project": "%Projec
这部分是对 Previous Question 的扩展我的: 我现在可以从我的 CI Controller 成功返回 JSON 数据,它返回: {"results":[{"id":"1","Sourc
有什么有效的方法可以删除 ios 中 CBL 的所有文档存储?我对此有疑问,或者,如果有人知道如何从本质上使该应用程序像刚刚安装一样,那也会非常有帮助。我们正在努力确保我们的注销实际上将应用程序设置为
我有一个 Rails 应用程序,它与其他 Rails 应用程序通信以进行数据插入。我使用 jQuery $.post 方法进行数据插入。对于插入,我的其他 Rails 应用程序显示 200 OK。但在
我正在为服务于发布请求的 API 调用运行单元测试。我正在传递请求正文,并且必须将响应作为帐户数据返回。但我只收到断言错误 注意:数据是从 Azure 中获取的 spec.js const accou
我是一名优秀的程序员,十分优秀!