- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
@ 。
本节通过一个简单Flink写入Hudi表的编程示例,后续可结合自身业务拓展,先创建一个Maven项目,这次就使用Java来编写Flink程序.
由于中央仓库没有scala2.12版本的资源,前面文章已经编译好相关jar,那这里就将hudi-flink1.15-bundle-0.12.1.jar手动安装到本地maven仓库 。
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.1 -Dpackaging=jar -Dfile=./hudi-flink1.15-bundle-0.12.1.jar
Pom文件内容添加如下内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itxs</groupId>
<artifactId>hudi-flink-demo</artifactId>
<version>1.0</version>
<name>hudi-flink-demo</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<hoodie.version>0.12.1</hoodie.version>
<hadoop.version>3.3.4</hadoop.version>
<flink.version>1.15.1</flink.version>
<slf4j.version>2.0.5</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink_${scala.binary.version}</artifactId>
<version>${hoodie.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
创建一个HudiDemo的Java文件实现一个简单写入hudi表流程 。
package cn.itxs;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.TimeUnit;
public class HudiDemo
{
public static void main( String[] args )
{
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 本地启动flink的web页面
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
embeddedRocksDBStateBackend.setDbStoragePath("file:///D:/rocksdb");
embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
env.setStateBackend(embeddedRocksDBStateBackend);
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop1:9000/checkpoints/flink");
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
checkpointConfig.setTolerableCheckpointFailureNumber(5);
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE source_a2 (\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" `partition` varchar(20),\n" +
" PRIMARY KEY(uuid) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1'\n" +
")"
);
tableEnv.executeSql("CREATE TABLE a2 (\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" `partition` varchar(20),\n" +
"PRIMARY KEY(uuid) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/a2',\n" +
" 'table.type' = 'MERGE_ON_READ'\n" +
")"
);
tableEnv.executeSql("insert into a2 select * from source_a2");
}
}
通过使用createLocalEnvironmentWithWebUI开启动FlinkWebUI,也即是可以在本地上查看flink的web页面 。
本地rocksdb状态后端也有对应的存储数据 。
HDFS上也可以查看到刚刚创建的hudi表信息 。
对上面小修改一下代码,将最前面的环境中注释createLocalEnvironmentWithWebUI和setDbStoragePath,放开getExecutionEnvironment;将表名改为a3,执行mvn package编译打包,将打包的文件上传 。
flink run -t yarn-per-job -c cn.itxs.HudiDemo /home/commons/flink-1.15.1/otherjars/hudi-flink-demo-1.0.jar
运行日志如下 。
查看Yarn的application_1669357770610_0019 。
查看HDFS也可以查看到刚刚创建的hudi表信息 。
CDC 即 Change Data Capture 变更数据捕获,可以通过 CDC 得知数据源表的更新内容(包含Insert Update 和 Delete),并将这些更新内容作为数据流发送到下游系统。捕获到的数据操作具有一个标识符,分别对应数据的增加,修改和删除.
CDC数据保存了完整的数据库变更,可以通过以下任意一种方式将数据导入Hudi:
说明 。
下面则演示上面第一种方式方式的使用 。
下面以 MySQL 5.7 版本为例说明。修改 my.cnf 文件,增加:
server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
先创建演示数据库 test和一张 student 表 。
create database test;
use test;
CREATE TABLE `student` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` varchar(10) NOT NULL,
`age` int NOT NULL,
`class` varchar(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB CHARSET = utf8;
将flink-sql-connector-mysql-cdc-2.3.0.jar和flink-sql-connector-kafka-1.15.1.jar上传到flink的lib目录下 。
flink-sql-connector-mysql-cdc-2.3.0.jar可以从github上下载 https://github.com/ververica/flink-cdc-connectors 。
flink-sql-connector-kafka-1.15.1.jar直接在maven仓库下 。
CREATE TABLE student_binlog (
id INT NOT NULL,
name STRING,
age INT,
class STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysqlserver',
'port' = '3308',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'student'
);
create table student_binlog_sink_kafka(
id INT NOT NULL,
name STRING,
age INT,
class STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) with (
'connector'='upsert-kafka',
'topic'='data_test',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.group.id' = 'testGroup',
'key.format'='json',
'value.format'='json'
);
insert into student_binlog_sink_kafka select * from student_binlog;
查看Flink的Web UI,可以看到刚才提交的job 。
开启tableau方式查询表 。
set 'sql-client.execution.result-mode' = 'tableau';select * from student_binlog_sink_kafka;
往mysql的student表插入和更新数据测试下 。
INSERT INTO student VALUES(1,'张三',16,'高一3班');
COMMIT;
INSERT INTO student VALUES(2,'李四',18,'高三3班');
COMMIT;
UPDATE student SET NAME='李四四' WHERE id = 2;
COMMIT;
CREATE TABLE student_binlog_source_kafka (
id INT NOT NULL,
name STRING,
age INT,
class STRING
)
WITH(
'connector' = 'kafka',
'topic'='data_test',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE student_binlog_sink_hudi (
id INT NOT NULL,
name STRING,
age INT,
class STRING,
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`class`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/student_binlog_sink_hudi',
'table.type' = 'MERGE_ON_READ',
'write.option' = 'insert',
'write.precombine.field' = 'class'
);
insert into student_binlog_sink_hudi select * from student_binlog_source_kafka;
mysql中student表新增加2条数据 。
INSERT INTO student VALUES(3,'韩梅梅',16,'高二2班');
INSERT INTO student VALUES(4,'李雷',16,'高二2班');
COMMIT;
查看HDFS中已经有相应的分区和数据了 。
参数名称 | 描述 | 默认值 | 备注 |
---|---|---|---|
write.task.max.size | 每个write task使用的最大内存,超过则对数据进行flush | 1024MB | write buffer使用的内存 = write.task.max.size - compaction.max_memory,当write buffer总共使用的内存超过限制,则将最大的buffer进行flush |
write.batch.size | 数据写入batch的大小 | 64MB | 推荐使用默认配置 |
write.log_block.size | Hudi的log writer将数据进行缓存,等达到该参数限制,才将数据flush到disk形成LogBlock | 128MB | 推荐使用默认配置 |
write.merge.max_memory | COW类型的表,进行incremental data和data file能使用的最大heap size | 100MB | 推荐使用默认配置 |
compaction.max_memory | 每个write task进行compaction能使用的最大heap size | 100MB | 如果是online compaction,且资源充足,可以调大该值,如1024MB |
参数名称 | 描述 | 默认值 | 备注 |
---|---|---|---|
write.tasks | write task的并行度,每一个write task写入1~N个顺序buckets | 4 | 增加该值,对小文件的数据没有影响 |
write.bucket_assign.tasks | bucket assigner operators的并行度 | Flink的parallelism.default参数 | 增加该值,会增加bucket的数量,所以也会增加小文件的数量 |
write.index_boostrap.tasks | index bootstrap的并行度 | Flink的parallelism.default参数 | |
read.tasks | read operators的并行度 | 4 | |
compaction.tasks | online compaction的并行度 | 4 | 推荐使用offline compaction |
只适用于online compaction 。
参数名称 | 描述 | 默认值 | 备注 |
---|---|---|---|
compaction.schedule.enabled | 是否定期生成compaction plan | true | 即使compaction.async.enabled = false,也推荐开启该值 |
compaction.async.enabled | MOR类型表默认开启Async Compaction | true | false表示关闭online compaction |
compaction.trigger.strategy | 触发compaction的Strategy | num_commits | 可选参数值:1. num_commits:delta commits数量达到多少;2. time_elapsed:上次compaction过后多少秒;3. num_and_time:同时满足num_commits和time_elapsed;4. num_or_time:满足num_commits或time_elapsed |
compaction.delta_commits | 5 | ||
compaction.delta_seconds | 3600 | ||
compaction.target_io | 每个compaction读写合计的目标IO,默认500GB | 512000 |
hudi源表对应一份hdfs数据,可以通过spark,flink 组件或者hudi客户端将hudi表的数据映射为hive外部表,基于该外部表, hive可以方便的进行实时视图,读优化视图以及增量视图的查询.
这里以hive3.1.3(关于hive可以详细查看前面的文章)、 hudi 0.12.1为例, 其他版本类似 。
将hudi-hadoop-mr-bundle-0.9.0xxx.jar , hudi-hive-sync-bundle-0.9.0xx.jar 放到hiveserver 节点的lib目录下 。
cd /home/commons/apache-hive-3.1.3-bin
cp -rf /home/commons/hudi-release-0.12.1/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.1.jar lib/
cp -rf /home/commons/hudi-release-0.12.1/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.1.jar lib/
按照需求选择合适的方式并重启hive 。
nohup hive --service metastore &
nohup hive --service hiveserver2 &
连接jdbc hive2测试,显示所有数据库 。
Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置示例如下 。
CREATE TABLE t7(
id int,
num int,
ts int,
primary key (id) not enforced
)
PARTITIONED BY (num)
with(
'connector'='hudi',
'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t7',
'table.type'='COPY_ON_WRITE',
'hive_sync.enable'='true',
'hive_sync.table'='h7',
'hive_sync.db'='default',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop2:9083'
);
insert into t7 values(1,1,1);
Flink官网的找到对应文档版本找到connector-hive,下载flink-sql-connector-hive-3.1.2_2.12-1.15.1.jar,上传到flink的lib目录下,建表示例 。
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/commons/apache-hive-3.1.3-bin/conf/'
);
use catalog hive_catalog;
CREATE TABLE t8(
id int,
num int,
ts int,
primary key (id) not enforced
)
PARTITIONED BY (num)
with(
'connector'='hudi',
'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t8',
'table.type'='COPY_ON_WRITE',
'hive_sync.enable'='true',
'hive_sync.table'='h8',
'hive_sync.db'='default',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop2:9083'
);
本人博客网站 IT小神 www.itxiaoshen.com 。
最后此篇关于大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-后续的文章就讲到这里了,如果你想了解更多关于大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-后续的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
本文全面深入地探讨了Docker容器通信技术,从基础概念、网络模型、核心组件到实战应用。详细介绍了不同网络模式及其实现,提供了容器通信的技术细节和实用案例,旨在为专业从业者提供深入的技术洞见和实
📒博客首页:崇尚学技术的科班人 🍣今天给大家带来的文章是《Dubbo快速上手 -- 带你了解Dubbo使用、原理》🍣 🍣希望各位小伙伴们能够耐心的读完这篇文章🍣 🙏博主也在学习阶段,如若发
一、写在前面 我们经常使用npm install ,但是你是否思考过它内部的原理是什么? 1、执行npm install 它背后帮助我们完成了什么操作? 2、我们会发现还有一个成为package-lo
Base64 Base64 是什么?是将字节流转换成可打印字符、将可打印字符转换为字节流的一种算法。Base64 使用 64 个可打印字符来表示转换后的数据。 准确的来说,Base64 不算
目录 协程定义 生成器和yield语义 Future类 IOLoop类 coroutine函数装饰器 总结 tornado中的
切片,这是一个在go语言中引入的新的理念。它有一些特征如下: 对数组抽象 数组长度不固定 可追加元素 切片容量可增大 容量大小成片增加 我们先把上面的理念整理在这
文章来源:https://sourl.cn/HpZHvy 引 言 本文主要论述的是“RPC 实现原理”,那么首先明确一个问题什么是 RPC 呢?RPC 是 Remote Procedure Call
源码地址(包含所有与springmvc相关的,静态文件路径设置,request请求入参接受,返回值处理converter设置等等): spring-framework/WebMvcConfigurat
请通过简单的java类向我展示一个依赖注入(inject)原理的小例子虽然我已经了解了spring,但是如果我需要用简单的java类术语来解释它,那么你能通过一个简单的例子向我展示一下吗?提前致谢。
1、背景 我们平常使用手机和电脑上网,需要访问公网上的网络资源,如逛淘宝和刷视频,那么手机和电脑是怎么知道去哪里去拿到这个网络资源来下载到本地的呢? 就比如我去食堂拿吃的,我需要
大家好,我是飞哥! 现在 iptables 这个工具的应用似乎是越来越广了。不仅仅是在传统的防火墙、NAT 等功能出现,在今天流行的的 Docker、Kubernets、Istio 项目中也经
本篇涉及到的所有接口在公开文档中均无,需要下载 GitHub 上的源码,自己创建私有类的文档。 npm run generateDocumentation -- --private yarn gene
我最近在很多代码中注意到人们将硬编码的配置(如端口号等)值放在类/方法的深处,使其难以找到,也无法配置。 这是否违反了 SOLID 原则?如果不是,我是否可以向我的团队成员引用另一个“原则”来说明为什
我是 C#、WPF 和 MVVM 模式的新手。很抱歉这篇很长的帖子,我试图设定我所有的理解点(或不理解点)。 在研究了很多关于 WPF 提供的命令机制和 MVVM 模式的文本之后,我在弄清楚如何使用这
可比较的 jQuery 函数 $.post("/example/handler", {foo: 1, bar: 2}); 将创建一个带有 post 参数 foo=1&bar=2 的请求。鉴于 $htt
如果Django不使用“延迟查询执行”原则,主要问题是什么? q = Entry.objects.filter(headline__startswith="What") q = q.filter(
我今天发现.NET框架在做计算时遵循BODMAS操作顺序。即计算按以下顺序进行: 括号 订单 部门 乘法 添加 减法 但是我四处搜索并找不到任何文档确认 .NET 绝对 遵循此原则,是否有此类文档?如
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
API 回顾 在创建 Viewer 时可以直接指定 影像供给器(ImageryProvider),官方提供了一个非常简单的例子,即离屏例子(搜 offline): new Cesium.Viewer(
As it currently stands, this question is not a good fit for our Q&A format. We expect answers to be
我是一名优秀的程序员,十分优秀!