- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
因为要开发Flinksql,决定要使用HiveCatalog的支持,Flink当前最新版本是1.12.2,集群Hive的版本是1.1.0,而且需要用某个Linux用户进行代理。
在实际开发中,遇到两个问题:
1、 Hive1.1.0使用的不是jdbc,而是MetastoreClient,通过Thrift进行连接,而他不支持HADOOP_PROXY_USER;
2、 Kerberos认证需要什么配置文件,是否需要在代码里配置UGI?;
这个问题上一篇文章 已经给出解决方案。
具体请参考:hive-metastore(HIVE-15579)
Github代码CommitGithub链接
经过测试,发现并不需要在代码中写任何 UserGroupInformation 和 doAs 相关的代码。
需要的配置文件如下:
hive-site.xml
<!--First created by Slankka-->
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://xxxxx.xxxxx.xxxxxx.com:xxxxx</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<!--property>
<name>hive.metastore.execute.setugi</name>
<value>slankka</value>
</property-->
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
</property>
<!--property>
<name>hive.server2.enable.doAs</name>
<value>true</value>
</property-->
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<!--property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property-->
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/_HOST@slankka.COM</value>
</property>
<property>
<name>hive.server2.authentication.kerberos.principal</name>
<value>hive/_HOST@slankka.COM</value>
</property>
</configuration>
另外还需要一个文件:
core-site.xml
<configuration>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
</configuration>
这样在程序启动的时候,只需要指定这两个配置文件即可。
另外不需要任何HADOOP_CONF_DIR或者HIVE_CONF_DIR。
以上内容就是最小化配置。
代码示例:
public static void main(String[] args) {
Catalog catalog = new HiveCatalog("slankka", "flink", args[0], args[1], "1.1.0");
try {
// List<String> strings = catalog.listDatabases();
// for (String database : strings) {
// System.out.println(database);
// }
// ObjectPath objectPath = new ObjectPath("flink", "objectName");
// catalog.createFunction(objectPath, new CatalogFunctionImpl("className", FunctionLanguage.JAVA), false);
// catalog.dropFunction(objectPath, false);
// catalog.alterFunction(objectPath, new CatalogFunctionImpl("className", FunctionLanguage.JAVA), false);
// CatalogFunction function = catalog.getFunction(objectPath);
// catalog.listFunctions("flink");
// catalog.createTable(objectPath, new CatalogTableImpl());
catalog.open();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String[] strings = tableEnv.listCatalogs();
Arrays.stream(strings).forEach(System.out::println);
boolean pfc = Arrays.asList(strings).contains("slankka");
if (!pfc) {
tableEnv.registerCatalog("slankka", catalog);
}
tableEnv.useCatalog("slankka");
tableEnv.useDatabase("flink");
tableEnv.executeSql("drop table if exists slankka.flink.WordCountSink");
TableResult tableResult = tableEnv.executeSql("create table slankka.flink.WordCountSink (\n" +
" word STRING,\n" +
" len INT\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://slankka.com:3306/rtc',\n" +
" 'table-name' = 'flink_sink_test',\n" +
" 'username' = 'root',\n" +
" 'password' = '1'\n" +
")");
tableResult.print();
String[] tables = tableEnv.listTables();
System.out.println("Tables: --->");
Arrays.stream(tables).forEach(System.out::println);
} finally {
catalog.close();
}
}
}
执行结果如下:
21/03/15 15:23:08 INFO hive.HiveCatalog: Setting hive conf dir as /data/work/cloudservice/slankka/lib/
21/03/15 15:23:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/15 15:23:09 INFO hive.HiveCatalog: Created HiveCatalog 'slankka'
21/03/15 15:23:09 INFO hive.metastore: HADOOP_PROXY_USER is set. Using delegation token for HiveMetaStore connection.
21/03/15 15:23:09 INFO hive.metastore: Trying to connect to metastore with URI thrift://xxxxx.xxxxx.xxxxx.com:xxxx
21/03/15 15:23:09 INFO hive.metastore: Opened a connection to metastore, current connections: 1
21/03/15 15:23:09 INFO hive.metastore: Connected to metastore.
21/03/15 15:23:09 INFO hive.metastore: Closed a connection to metastore, current connections: 0
21/03/15 15:23:09 INFO hive.metastore: Trying to connect to metastore with URI thrift://xxxxx.xxxxx.xxxxx.com:xxxx
21/03/15 15:23:09 INFO hive.metastore: Opened a connection to metastore, current connections: 1
21/03/15 15:23:09 INFO hive.metastore: Connected to metastore.
21/03/15 15:23:09 INFO hive.HiveCatalog: Connected to Hive metastore
default_catalog
21/03/15 15:23:10 INFO catalog.CatalogManager: Set the current default catalog as [slankka] and the current default database as [flink].
+--------+
| result |
+--------+
| OK |
+--------+
1 row in set
Tables: --->
wordcountsink
因为最近的需求是做FlinkSQL平台,需要在实时计算平台上集成FlinkSQL功能,但目前刚刚有了研究成果,所以有了这篇笔记。 第一步:编写一个流 这里使用python编写的一个流,比Java简
报错记录 提交作业的时候,遇到一些坑,报错具有迷惑性。 413 Request Entity Too Large. Try to raise [rest.client.max-content-le
Flink 自带了一个SQLClient,截至目前Flink-1.13.0,Flink还没有Flink SQL Gateway。 需求 由于需要在提供友好的用户界面,类似于低代码平台,因此需要一个
问题类别 Spark框架自身的问题 Hadoop全家桶的问题 开发者使用的库的问题 排查 1、 已知Hadoop-common-2.6.0的UGI存在bug,代码为HADOOP
问题 报错 org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table f
FlinkSQL 和常见的SQL一样,也分为 DDL,DML,DQL,DCL。 本文的主要内容是探讨如何利用FlinkAPI 对多行SQL语句进行校验。 SQL语言共分为四大类:数据查询语言
本文首发自https://www.cnblogs.com/slankka/ 转载请注明出处。 本文的主要内容是介绍如何动态加载Flink作业的UDF。 Classloader 加载UDF一定是c
起因 由于近期研究了ElasticSearch的Connector,但是目前生产环境不需要此jar。 Flink社区的一些小伙伴交流的时候,发现有人在使用Flink Session-Clu
之前的一篇文章【Flink系列】构建实时计算平台——特别篇,用InfluxDb收集Flink Metrics ,里面写道 Influxdb 1.8,100个作业的情况下, 内存占用峰值会超过
研究内容 flink客户端提交命令为 flink run ...., 如果客户端的main 需要读取系统属性(System properties),读取系统属性变量的位置有两种: 从作业的m
背景 Flink 的指标非常多,同时由于参数配置的不正确,导致指标上报频率过快,PushGateway集群压力过大。 相关文章 如果读者在找限流、拦截指标的做法,可参考我的其他文章,本篇略显敷衍
Flink提供了Checkpoint/Savepoint来保存状态,以便在出错时进行恢复,在上一个状态的基础上恢复计算流程。 问题 1. 如何开启Checkpoint? Flink-Checkp
问题 flink-1.13.5 用户提交FlinkSQL作业,连接Hive时发现缺少MRVersion类的定义。 java.lang.NoClassDefFoundError: org/apach
Influxdb 快速入门 从Docker启动 Influxdb docker pull influxdb:LATEST docker run -d --name influxdb -p 808
Influxdb Java客户端 Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0. Note: We recommend using the new cli
问题 Flink通过Flink-hive-connector来连接Hive,但是连接Hive报错。 具体报错是因为: HiveMetaStoreClient连接HiveMetastore 使用的T
我想定义函数 MAX_BY,它接受 T 类型的值和 Number 类型的排序参数,并根据窗口返回最大元素订购(T 类型)。我试过了 public class MaxBy extends Aggrega
问题 Flink SQL 使用Hive Dialect,同时Hive使用1.1.0-CDH5.x.x,报错: Exception in thread "main" java.l
背景 因为要开发Flinksql,决定要使用HiveCatalog的支持,Flink当前最新版本是1.12.2,集群Hive的版本是1.1.0,而且需要用某个Linux用户进行代理。 在实际开发中
问题 使用了Flink-Kafka-Connector(版本1.13.0),使用FlinkKafkaConsumer 上报了KafkaLag指标,但是换成 KafkaSource 却没有任何指标。
我是一名优秀的程序员,十分优秀!