- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
@ 。
Apache Paimon 官网 https://paimon.apache.org/ 最新稳定版本为0.4.0-incubating,0.5-SNAPSHOT正在开发 。
Apache Paimon 文档地址 https://paimon.apache.org/docs/master/ 。
Apache Paimon 源码地址 https://github.com/apache/incubator-paimon/ 。
Apache Paimon (incubating) 目前属于Apache 软件基金会 (ASF) 的孵化项目,其原项目为由Flink官方维护的Flink Table Store;其设计为一个开源流数据湖平台,包揽Streaming实时计算能力和LakeHouse架构优势,统一了存储,具有高速数据摄取,变更日志跟踪和高效的实时分析强大能力.
Apache Paimon 采用开放的数据格式和技术理念,不仅支持Flink SQL编写和本地查询,还可以与其他诸多业界主流计算引擎进行对接.
Apache Paimon 适用于需要在流数据上进行实时查询和分析的场景。它可以帮助用户更容易地构建流式数据湖,实现高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。例如在金融、电子商务、物联网等行业中,可以使用 Apache Paimon 来实现实时推荐、欺诈检测、异常检测等应用.
Paimon 创新的结合了湖存储 + LSM + 列式格式 (ORC, Parquet),为湖存储带来大规模实时更新能力 。
读/写:Paimon支持多种方式来读/写数据和执行OLAP查询.
生态系统:除了Apache Flink, Paimon还支持其他计算引擎的读取,如Apache Hive、Apache Spark、Presto和Trino.
内部:在底层,Paimon将列文件存储在文件系统/对象存储中,并使用LSM树结构来支持大量数据更新和高性能查询.
对于像Apache Flink这样的流媒体引擎,通常有三种类型的连接器
Paimon提供表抽象。它的使用方式与传统数据库没有什么不同
表的所有文件都存储在一个基本目录下。Paimon文件以分层的方式组织。下图说明了文件布局。从快照文件开始,Paimon读取器可以递归地访问表中的所有记录.
LSM
数据结构的追加写能力,Paimon 在大规模的更新数据输入的场景中提供了出色的性能。
官方提供对应引擎的版本支持如下 。
# 这里选择下载最新版本Flink1.17.1
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
# 解压文件
tar -xvf flink-1.17.1-bin-scala_2.12.tgz
# 进入flink目录
cd flink-1.17.1
配置环境变量,修改flink-conf.yaml配置文件 。
# 如果是Flink17版本以下env.java.opts.all则需改为env.java.opts
env.java.opts.all: "-Dfile.encoding=UTF-8"
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
execution.checkpointing.interval: 10s
# 用于存储和检查点状态
state.backend: rocksdb
# 存储检查点的数据文件和元数据的默认目录
state.checkpoints.dir: hdfs://myns/flink/myns
# savepoints 的默认目标目录(可选)
state.savepoints.dir: hdfs://myns/flink/savepoints
# 用于启用/禁用增量 checkpoints 的标志
state.backend.incremental: true
使用paimon非常简单,和其他数据湖产品一样,都是将jar包放在引擎的目录下 。
# 解决依赖问题,将hadoop的hadoop-mapreduce-client-core-3.3.4.jar拷贝到flink
cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar ./lib/
# 下载最新版的paimon,目前0.5属于快照版本,,可以先进入lib目录,然后下载到当前lib,也可以通过其他地方下载然后上传拷贝到flink的lib目录下
cd lib/
wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/paimon-flink-1.17-0.5-20230802.034234-105.jar
# 由于后续会使用到其他连接器,这里先下载安装好,后面直接使用即可,先下载flink-sql-connector-hive连接器
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
# 下载flink-sql-connector-mysql-cdc连接器
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# 下载flink-sql-connector-kafka连接器
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar
# 重新回到flink的根目录
cd ../
# 先保证hadoop环境,通过yarn启动flink集群
./bin/yarn-session.sh -d
# 以提交yarn-session方式启动sql客户端
./bin/sql-client.sh -s yarn-session
测试环境是否可用 。
set 'sql-client.execution.result-mode' = 'tableau';
show databases;
show tables;
select 1;
通过yarn管理页面可以看到有 Flink session cluster运行job,点击该记录的ApplicationMaster跳转到flink管理页面,也可以看到刚才job已经完成,环境准备完毕.
Paimon Catalog可以持久化元数据,当前支持两种类型的metastore 。
下面的Flink SQL注册并使用一个名为fs_catalog的Paimon编目。元数据和表文件存放在hdfs://myns/paimon/fs下.
CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://myns/paimon/fs'
);
show catalogs;
使用Hive Catalog前需要先启动hive元数据服务 。
nohup hive --service metastore &
通过使用Paimon Hive catalog,对catalog的更改将直接影响到相应的Hive metastore。使用Hive catalog,数据库名、表名和字段名应该是小写的.
CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop2:9083',
'hive-conf-dir' = '/home/commons/apache-hive-3.1.3-bin/conf/',
'warehouse' = 'hdfs://myns/paimon/hive'
);
show catalogs;
USE CATALOG hive_catalog;
CREATE TABLE test1 (
id BIGINT,
a INT,
b STRING,
dt STRING COMMENT 'timestamp string in format yyyyMMdd',
PRIMARY KEY(id, dt) NOT ENFORCED
) PARTITIONED BY (dt);
在指定的Catalog中创建表 。
关闭重新进入sql-client后,只剩下默认的default_catalog,因此可以在启动客户端时执行指定创建catalog语句,vim conf/sql-client-init.sql 。
set 'sql-client.execution.result-mode' = 'tableau';
CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://myns/paimon/fs'
);
CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop2:9083',
'hive-conf-dir' = '/home/commons/apache-hive-3.1.3-bin/conf/',
'warehouse' = 'hdfs://myns/paimon/hive'
);
USE CATALOG hive_catalog;
通过-i参数启动执行sql文件,启动后就可以看到hive_catalog之前已创建的表了 。
./bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql
在Paimon Catalog中创建的表由Catalog管理也就是管理表。当表从目录中删除时,它的表文件也将被删除,与hive内部表相似.
CREATE TABLE user_behavior1 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
在删除表之前,应该停止在表上插入作业,否则不能完全删除表文件.
CREATE TABLE user_behavior2 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
指定统计模式:Paimon将自动收集数据文件的统计信息,以加快查询过程。支持四种模式
字段默认值:Paimon表目前支持为表属性中的字段设置默认值,注意不能指定分区字段和主键字段.
CREATE TABLE user_behavior2 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with(
'fields.item_id.deafult-value'='0'
);
表可以通过查询结果创建或填充,简单创建表.
CREATE TABLE items1 (
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE items2 AS SELECT * FROM items1;
/* 分区表 */
CREATE TABLE user_behavior_p1 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE user_behavior_p2 WITH ('partition' = 'dt') AS SELECT * FROM user_behavior_p1;
/* change options */
CREATE TABLE user_behavior_3 (
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE user_behavior_4 WITH ('file.format' = 'parquet') AS SELECT * FROM user_behavior_3;
/* 主键 */
CREATE TABLE user_behavior_5 (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;
CREATE TABLE user_behavior_6 WITH ('primary-key' = 'dt,hh') AS SELECT * FROM user_behavior_5;
/* 主键 + 分区 */
CREATE TABLE user_behavior_all (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
CREATE TABLE user_behavior_all_as WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM user_behavior_all;
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) ;
CREATE TABLE user_behavior_like LIKE user_behavior;
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (
'bucket' = '2',
'bucket-key' = 'user_id'
);
外部表由Catalog记录,但不由Catalog管理。如果删除外部表,则不会删除其表文件。可以在任何目录中使用Paimon外部表。如果不想创建Paimon目录,而只想读/写表,那么可以考虑使用外部表。Flink SQL支持读写外部表;外部Paimon表是通过指定连接器和路径表属性创建的.
use catalog default_catalog;
CREATE TABLE user_behavior_external (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://myns/paimon/external',
'auto-create' = 'true' -- 如果表路径不存在,此table属性将为空表创建表文件目前仅支持Flink
);
临时表仅由Flink支持。与外部表一样,临时表只是记录,而不是由当前Flink SQL会话管理。如果临时表被删除,它的资源不会被删除。当Flink SQL会话关闭时,也会删除临时表。如果希望将Paimon Catalog与其他表一起使用,但又不希望将它们存储在其他Catalog中,则可以创建一个临时表.
USE CATALOG hive_catalog;
CREATE TEMPORARY TABLE temp_table (
k INT,
v STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://myns/paimon/temp/temp_table.csv',
'format' = 'csv'
);
# 可以使用临时表和其他表进行关联查询
SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k = temp_table.k;
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
# 修改表属性
ALTER TABLE my_table SET (
'write-buffer-size' = '256 MB'
);
# 修改表名
ALTER TABLE my_table RENAME TO my_table_new;
# 删除表的属性
ALTER TABLE my_table RESET ('write-buffer-size');
# 填写列
ALTER TABLE my_table ADD (c1 INT, c2 STRING);
# 重命名列
ALTER TABLE my_table RENAME c0 TO c1;
# 删除列
ALTER TABLE my_table DROP (c1, c2);
# 更改列的空属性
CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT NULL);
-- 将列' coupon_info '从NOT NULL更改为可空
ALTER TABLE my_table MODIFY coupon_info FLOAT;
-- 将列' coupon_info '从可空改为NOT NULL如果已经有NULL值,设置如下表选项,在修改表之前静默删除这些记录。
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
ALTER TABLE my_table MODIFY coupon_info FLOAT NOT NULL;
# 更改列注释
ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count'
# 添加列位置
ALTER TABLE my_table ADD c INT FIRST;
ALTER TABLE my_table ADD c INT AFTER b;
# 改变列位置
ALTER TABLE my_table MODIFY col_a DOUBLE FIRST;
ALTER TABLE my_table MODIFY col_a DOUBLE AFTER col_b;
# 修改列类型
ALTER TABLE my_table MODIFY col_a DOUBLE;
# 添加WATERMARK,下面的SQL从现有的列log_ts中添加一个计算列ts,并在列ts上添加一个策略为ts - INTERVAL '1' HOUR的水印,该水印被标记为表my_table的事件时间属性。
CREATE TABLE my_test_wm (
id BIGINT,
name STRING,
log_ts BIGINT
);
ALTER TABLE my_test_wm ADD (
ts AS TO_TIMESTAMP_LTZ(log_ts,3),
WATERMARK FOR ts AS ts - INTERVAL '1' HOUR
);
# 修改WATERMARK
ALTER TABLE my_test_wm MODIFY WATERMARK FOR ts AS ts - INTERVAL '2' HOUR
# 删除WATERMARK
ALTER TABLE my_test_wm DROP WATERMARK
最后此篇关于新一代开源流数据湖平台ApachePaimon入门实操-上的文章就讲到这里了,如果你想了解更多关于新一代开源流数据湖平台ApachePaimon入门实操-上的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
如何使用 ToggleButton 启用/禁用推送通知 示例: ToggleButton 禁用 (OFF) >>> 推送通知应该停止 ToggleButton 启用 (ON) >>> 推送通知 Sho
我有一个 div,它通过简单的转换将自身转换为: div{ transform: translate3d(0, -100%, 0); transition: all .5s; } div.ac
我尝试为静音/取消静音按钮创建一个开/关按钮: override func touchesEnded(touches: NSSet, withEvent event: UIEvent) {
我正在手动设置 Jest 。 我的 repo 结构: my-proj - src - components ... - accordion - index.jsx - t
我有一个这样的测试失败了,因为没有调用模拟,问题是模拟被调用但在测试完成之前。 test('should submit if proper values', () => { const spy =
目前我正在使用标准的 testRegex 逻辑来运行我的测试 "jest": { "moduleFileExtensions": [ "ts", "js"
目前我有这个测试: import toHoursMinutes from '../../../app/utils/toHoursMinutes'; describe('app.utils.toHour
使用Chai,您可以创建一个 spy 对象,如下所示: chai.spy.object([ 'push', 'pop' ]); 使用 Jasmine ,您可以使用: jasmine.createSpy
我正在编写一个 Jest 测试,其中我调用一个函数并期望返回一个对象,如下所示: const repository = container => { const makeBooking = (us
当我单独运行每个测试时,它们都成功了。但是当我通过 npm test 一起运行它们时第二个测试失败: Expected number of calls: 2 Received number of ca
我们最近将两个不同的 repos 迁移到一个 monorepo 中。每个都使用 jest 和自己的自定义配置,在他们自己的 package.json 文件中定义。 我想使用 --projects标志以
我试图模拟属性(property) tz和一个使用 jest 的函数,但我不知道将这两个东西一起模拟: 如果运行类似: jest.mock('moment-timezone', () => () =>
我正在尝试设置 Jest 来测试我的应用程序的发展。我收到以下错误: SyntaxError: Unexpected identifier > 1 | const screenSize = requi
我将 Jest 与 React-Native 结合使用,并且偶然发现了一个问题。 App.js 组件中的一小段代码导致 50:50 分支覆盖率: const storeMiddleware = __D
我在下面创建了一个 Jest 测试文件。但是没有创建该文件的快照。我的代码有什么问题? import React from 'react'; import Carousel from './compo
我正在尝试弄清楚如何更新单个快照文件。在文档中,它说只需添加 -t 并且我假设文件名,但这对我不起作用。 例如,在我使用的终端中。 jest -u -t test/js/tests/component
我是 JEST 新手,目前正在测试一个 Javascript 组件,该组件在其 onComponentDidMount 中进行 API 调用。根据 ajax 调用(api 调用)的返回数据,我的组件显
我正在尝试开玩笑地为我的 Web 组件项目编写测试。我已经在 es2015 预设中使用了 babel。我在加载 js 文件时遇到问题。我遵循了一段代码,其中 document对象有一个 current
我刚刚开始使用 jest,但有些事情我不太清楚。 例如,为什么要测试此功能: const liElement = object => `${object.title}`; 与: expect(liEl
我正在编写需要定义 window.location.href 的单元测试第一个单元测试创建如下 describe('myMethod()', () => { beforeEach(()
我是一名优秀的程序员,十分优秀!