- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
Flink 是一个流处理和批处理统一的大数据框架,专门为高吞吐量和低延迟而设计。开发者可以使用SQL进行流批统一处理,大大简化了数据处理的复杂性。本文将介绍Flink SQL的基本原理、使用方法、流批统一,并通过几个例子进行实践.
Flink SQL建立在Apache Flink之上,利用Flink的强大处理能力,使得用户可以使用SQL语句进行流数据和批数据处理。Flink SQL既支持实时的流数据处理,也支持有界的批数据处理.
Flink SQL用SQL作为处理数据的接口语言,将SQL语句转换成数据流图(Dataflow Graph),再由Flink引擎执行.
使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:
TableEnvironment
对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。CREATE TABLE
语句定义输入数据源(source),可以是Kafka、CSV文件等。CREATE TABLE
定义输出数据源(sink),并将处理结果输出。以下是一个从CSV文件读取数据,通过SQL查询,再将数据输出到CSV的完整例子.
input.csv文件
内容,如下:1,product_A,10.5
2,product_B,20.3
3,product_C,15.8
1,product_D,12.2
2,product_A,18.7
编写代码之前先在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
示例代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSqlDemo {
public static void main(String[] args) throws Exception {
// 设置环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //为了方便测试看效果,这里并行度设置为1
// 使用EnvironmentSettings创建StreamTableEnvironment,明确设置为批处理模式
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode() // 设置为批处理模式,这样后续才能一次性的输出到csv中
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 定义输入数据源
String createSourceTableDdl = "CREATE TABLE csv_source (" +
" user_id INT," +
" product STRING," +
" order_amount DOUBLE" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/input.csv'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createSourceTableDdl);
// // 编写 SQL 查询
// String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
// // 执行查询并打印
// tableEnv.executeSql(query).print();
// env.execute("Flink SQL Demo");
// 定义输出数据源
String createSinkTableDdl = "CREATE TABLE csv_sink (" +
" user_id INT," +
" total_amount DOUBLE" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/output.csv'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createSinkTableDdl);
// 执行查询并将结果输出到csv_sink
String query = "INSERT INTO csv_sink " +
"SELECT user_id, SUM(order_amount) as total_amount " +
"FROM csv_source " +
"GROUP BY user_id";
tableEnv.executeSql(query);
// env.execute("Flink SQL Job");
}
}
流批统一是大数据处理领域的一个概念,它指的是使用一套代码来同时处理流数据(Streaming)和批数据(Batching).
流处理和批处理的区别如下:
在早期,流处理和批处理通常需要不同的系统来执行。对于批处理,可能使用如Hadoop这样的框架;而对于流处理,可能使用如Apache Storm这样的框架。这就导致开发者要同时学习多种框架才能处理不同类型的数据作业.
流批统一的概念,就是将这两种数据处理方式合并到一个平台中,这样一个系统既可以处理静止的大批量数据集,也可以处理实时的数据流。这样做的优点是显而易见的:
Flink很好的实现了流批统一,可以让开发人员用相同的方式来编写批处理和流处理程序。不论是对有界(批处理)还是无界(流处理)的数据源,Flink都可以使用相同的API和处理逻辑来处理数据.
Flink 通过内置的表抽象来实现流批一体,这里的"表"可以是动态变化的(例如,来自实时数据流的表)或是静态的(例如,存储在文件或数据库中的批量数据表)。Flink SQL引擎会根据数据的实际来源自动优化执行计划.
Flink SQL的流批统一核心在于三点:
以下是一个完整的代码示例,用Flink来实现流批统一处理。Flink同时从Kafka 和 CSV读取数据,然后合并查询再输出结果:
代码中,先配置了Flink的流处理环境和表环境,然后用DDL语句在Flink中注册了Kafka和文件系统数据源。接着执行了一个SQL查询来合并来自这两种数据源的数据,并计算总金额。最后,打印出查询结果并开始执行Flink作业.
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class StreamBatchUnifiedDemo {
public static void main(String[] args) throws Exception {
// 设置流处理的环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// Kafka 流处理表
String createKafkaSourceDDL = "CREATE TABLE kafka_stream_orders (" +
"order_id STRING," +
"amount DOUBLE)" +
"WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'topic_test'," +
"'properties.bootstrap.servers' = '10.20.1.26:9092'," +
"'format' = 'json'," +
"'scan.startup.mode' = 'latest-offset'" +
")";
tableEnv.executeSql(createKafkaSourceDDL);
// 文件系统批处理表
String createFilesystemSourceDDL = "CREATE TABLE file_batch_orders (" +
"order_id STRING," +
"amount DOUBLE)" +
"WITH (" +
"'connector' = 'filesystem'," +
"'path' = 'file:///Users/yclxiao/Project/bigdata/flink-blog/doc/input_order.csv'," +
"'format' = 'csv'" +
")";
tableEnv.executeSql(createFilesystemSourceDDL);
// 执行统一查询,计算总金额
Table resultTable = tableEnv.sqlQuery("SELECT SUM(amount) FROM (" +
"SELECT amount FROM kafka_stream_orders " +
"UNION ALL " +
"SELECT amount FROM file_batch_orders)");
// 打印结果
tableEnv.toRetractStream(resultTable, Row.class).print();
// 开始执行程序
env.execute("Stream-Batch Unified Job");
}
}
通过以上示例代码,可以看出Flink SQL的流批一体设计:相同的SQL语句可以用在流处理和批处理中,而不需要做任何修改。Flink背后的执行引擎会自动根据数据的特性(流或者批)来进行相应的优化执行.
这就是Flink SQL非常强大的地方,它减少了开发者需要写不同代码逻辑的需求,简化了复杂的数据处理流程.
Flink SQL是一个非常强大的数据处理工具,可以应对多种复杂的数据处理场景.
本文主要介绍了Flink SQL的基本原理、编码套路、流批统一,再结合正确的代码示例进行实践。希望对你有帮助.
完整代码地址:https://github.com/yclxiao/flink-blog 。
======>>>>>> 关于我 <<<<<<====== 。
本篇完结!欢迎点赞 关注 收藏!!! 。
原文链接:https://mp.weixin.qq.com/s/WqyCjiIMK49T6eDl_Upc1A 。
最后此篇关于10分钟了解FlinkSQL使用的文章就讲到这里了,如果你想了解更多关于10分钟了解FlinkSQL使用的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
因为最近的需求是做FlinkSQL平台,需要在实时计算平台上集成FlinkSQL功能,但目前刚刚有了研究成果,所以有了这篇笔记。 第一步:编写一个流 这里使用python编写的一个流,比Java简
报错记录 提交作业的时候,遇到一些坑,报错具有迷惑性。 413 Request Entity Too Large. Try to raise [rest.client.max-content-le
Flink 自带了一个SQLClient,截至目前Flink-1.13.0,Flink还没有Flink SQL Gateway。 需求 由于需要在提供友好的用户界面,类似于低代码平台,因此需要一个
问题类别 Spark框架自身的问题 Hadoop全家桶的问题 开发者使用的库的问题 排查 1、 已知Hadoop-common-2.6.0的UGI存在bug,代码为HADOOP
问题 报错 org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table f
FlinkSQL 和常见的SQL一样,也分为 DDL,DML,DQL,DCL。 本文的主要内容是探讨如何利用FlinkAPI 对多行SQL语句进行校验。 SQL语言共分为四大类:数据查询语言
本文首发自https://www.cnblogs.com/slankka/ 转载请注明出处。 本文的主要内容是介绍如何动态加载Flink作业的UDF。 Classloader 加载UDF一定是c
起因 由于近期研究了ElasticSearch的Connector,但是目前生产环境不需要此jar。 Flink社区的一些小伙伴交流的时候,发现有人在使用Flink Session-Clu
之前的一篇文章【Flink系列】构建实时计算平台——特别篇,用InfluxDb收集Flink Metrics ,里面写道 Influxdb 1.8,100个作业的情况下, 内存占用峰值会超过
研究内容 flink客户端提交命令为 flink run ...., 如果客户端的main 需要读取系统属性(System properties),读取系统属性变量的位置有两种: 从作业的m
背景 Flink 的指标非常多,同时由于参数配置的不正确,导致指标上报频率过快,PushGateway集群压力过大。 相关文章 如果读者在找限流、拦截指标的做法,可参考我的其他文章,本篇略显敷衍
Flink提供了Checkpoint/Savepoint来保存状态,以便在出错时进行恢复,在上一个状态的基础上恢复计算流程。 问题 1. 如何开启Checkpoint? Flink-Checkp
问题 flink-1.13.5 用户提交FlinkSQL作业,连接Hive时发现缺少MRVersion类的定义。 java.lang.NoClassDefFoundError: org/apach
Influxdb 快速入门 从Docker启动 Influxdb docker pull influxdb:LATEST docker run -d --name influxdb -p 808
Influxdb Java客户端 Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0. Note: We recommend using the new cli
问题 Flink通过Flink-hive-connector来连接Hive,但是连接Hive报错。 具体报错是因为: HiveMetaStoreClient连接HiveMetastore 使用的T
我想定义函数 MAX_BY,它接受 T 类型的值和 Number 类型的排序参数,并根据窗口返回最大元素订购(T 类型)。我试过了 public class MaxBy extends Aggrega
问题 Flink SQL 使用Hive Dialect,同时Hive使用1.1.0-CDH5.x.x,报错: Exception in thread "main" java.l
背景 因为要开发Flinksql,决定要使用HiveCatalog的支持,Flink当前最新版本是1.12.2,集群Hive的版本是1.1.0,而且需要用某个Linux用户进行代理。 在实际开发中
问题 使用了Flink-Kafka-Connector(版本1.13.0),使用FlinkKafkaConsumer 上报了KafkaLag指标,但是换成 KafkaSource 却没有任何指标。
我是一名优秀的程序员,十分优秀!