- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0.
Note: We recommend using the new client libraries on this page to leverage the new read (via Flux) and write APIs and prepare for conversion to InfluxDB 2.0 and InfluxDB Cloud 2.0. For more information, see InfluxDB 2.0 API compatibility endpoints. Client libraries for InfluxDB 1.7 and earlier may continue to work, but are not maintained by InfluxData.
官方推荐使用本页新读(Flux)和写API来访问Influx 2.0
查看官方文档发现,客户端有:
influx-client-java
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>1.12.0</version>
</dependency>
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token);
//
// Create bucket "iot_bucket" with data retention set to 3,600 seconds
//
BucketRetentionRules retention = new BucketRetentionRules();
retention.setEverySeconds(3600);
Bucket bucket = influxDBClient.getBucketsApi().createBucket("iot-bucket", retention, "12bdc4164c2e8141");
Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux (endpoint should be enabled by flux-enabled option)
虽然InfluxDB 2.0 API兼容 influxdb 1.8.0,但是应该开启 flux
[http]
enabled = true
bind-address = ":8086"
auth-enabled = true
log-enabled = true
suppress-write-log = false
write-tracing = false
flux-enabled = true //重点在这
Flux是InfluxQL和其他类似SQL的查询语言的替代品,用于查询和分析数据。Flux使用功能语言模式,使其功能强大,灵活,并能够克服InfluxQL的许多限制。
看样子官方是更加推荐使用 Flux语法
例子:(bucket是InfluxDb2.0提出的概念,Influxdb1.x没有)
data = from(bucket: "db/rp")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "example-measurement" and
r._field == "example-field"
)
Key-Concepts
从这篇文章可以看到 2.0 是把Database和RententionPolicies合并为Bucket。因此Flink指标收集的位置是 flink/flink。
另外Organization是什么?
InfluxDB组织是一组用户的工作空间。所有仪表板,任务,存储桶和用户均属于组织。有关组织的更多信息,请参阅管理组织。
InfluxDb 1.8没有Organization的概念。通过下方的例子,可以看到1.8版本的org=-。
public class InfluxDB18Example {
public static void main(final String[] args) {
String database = "telegraf";
String retentionPolicy = "autogen";
InfluxDBClient client = InfluxDBClientFactory.createV1("http://localhost:8086",
"username",
"password".toCharArray(),
database,
retentionPolicy);
System.out.println("*** Write Points ***");
try (WriteApi writeApi = client.getWriteApi()) {
Point point = Point.measurement("mem")
.addTag("host", "host1")
.addField("used_percent", 29.43234543);
System.out.println(point.toLineProtocol());
writeApi.writePoint(point);
}
System.out.println("*** Query Points ***");
String query = String.format("from(bucket: \"%s/%s\") |> range(start: -1h)", database, retentionPolicy);
List<FluxTable> tables = client.getQueryApi().query(query);
tables.get(0).getRecords()
.forEach(record -> System.out.println(String.format("%s %s: %s %s",
record.getTime(), record.getMeasurement(), record.getField(), record.getValue())));
client.close();
}
}
public static List<JobLastCheckpointExternalPath> getCheckPoints(String jobId) {
InfluxDbConfig config = new InfluxDbConfig();
config.setHost("http://10.11.159.156:8099");
config.setDatabase("flink");
config.setPassword("flink");
config.setUsername("flink");
config.setRetentionPolicy("one_hour");
String database = config.getDatabase();
String retentionPolicy = config.getRetentionPolicy();
InfluxDBClient client = InfluxDBClientFactory.createV1(config.getHost(),
config.getUsername(),
config.getPassword().toCharArray(),
database,
retentionPolicy);
client.setLogLevel(LogLevel.BASIC);
QueryApi queryApi = client.getQueryApi();
String query = String.format("from(bucket: \"%s/%s\") |> range(start: -30m) |> filter(fn: (r) =>\n" +
" r._measurement == \"jobmanager_job_lastCheckpointExternalPath\" and\n" +
" r.job_id == \"%s\"\n" +
" ) |> sort(columns: [\"_time\"], desc: true) |> limit(n:100)", database, retentionPolicy, jobId);
//
// Query data
//
List<JobLastCheckpointExternalPath> tables = queryApi.query(query, JobLastCheckpointExternalPath.class);
client.close();
return tables;
}
@Measurement(name = "jobmanager_job_lastCheckpointExternalPath")
public static
class JobLastCheckpointExternalPath {
@Column(timestamp = true)
Instant time;
@Column(name = "job_name")
String jobName;
@Column(name = "job_id")
String jobId;
@Column(name = "value")
String value;
@Override
public String toString() {
return "JobLastCheckpointExternalPath{" +
"time=" + time +
", jobName='" + jobName + '\'' +
", jobId='" + jobId + '\'' +
", value='" + value + '\'' +
'}';
}
}
fields = ["3361e97159f5d473f273b38a96e7ba06"]
from(bucket: "flink/one_hour") |> range(start: -1h)
|> filter(fn: (r) => r._measurement == "jobmanager_job_lastCheckpointExternalPath" and contains(value: r.job_id, set: fields)
and r["_value"] != "n/a"
)
|> keep(columns: ["_time", "_value", "job_id", "job_name"])
|> limit(n:1000)
|> sort(columns: ["_time"], desc: true)
注意:经过实际使用发现 sort 写在limit 前面会导致实际排序不可靠。(就连官方文档都是sort在limit前面)
sort() and limit()
contains(value: r.job_id, set: fields)
keep(columns: ["_time", "_value", "job_id", "job_name"])
因为最近的需求是做FlinkSQL平台,需要在实时计算平台上集成FlinkSQL功能,但目前刚刚有了研究成果,所以有了这篇笔记。 第一步:编写一个流 这里使用python编写的一个流,比Java简
报错记录 提交作业的时候,遇到一些坑,报错具有迷惑性。 413 Request Entity Too Large. Try to raise [rest.client.max-content-le
Flink 自带了一个SQLClient,截至目前Flink-1.13.0,Flink还没有Flink SQL Gateway。 需求 由于需要在提供友好的用户界面,类似于低代码平台,因此需要一个
问题类别 Spark框架自身的问题 Hadoop全家桶的问题 开发者使用的库的问题 排查 1、 已知Hadoop-common-2.6.0的UGI存在bug,代码为HADOOP
问题 报错 org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table f
FlinkSQL 和常见的SQL一样,也分为 DDL,DML,DQL,DCL。 本文的主要内容是探讨如何利用FlinkAPI 对多行SQL语句进行校验。 SQL语言共分为四大类:数据查询语言
本文首发自https://www.cnblogs.com/slankka/ 转载请注明出处。 本文的主要内容是介绍如何动态加载Flink作业的UDF。 Classloader 加载UDF一定是c
起因 由于近期研究了ElasticSearch的Connector,但是目前生产环境不需要此jar。 Flink社区的一些小伙伴交流的时候,发现有人在使用Flink Session-Clu
之前的一篇文章【Flink系列】构建实时计算平台——特别篇,用InfluxDb收集Flink Metrics ,里面写道 Influxdb 1.8,100个作业的情况下, 内存占用峰值会超过
研究内容 flink客户端提交命令为 flink run ...., 如果客户端的main 需要读取系统属性(System properties),读取系统属性变量的位置有两种: 从作业的m
背景 Flink 的指标非常多,同时由于参数配置的不正确,导致指标上报频率过快,PushGateway集群压力过大。 相关文章 如果读者在找限流、拦截指标的做法,可参考我的其他文章,本篇略显敷衍
Flink提供了Checkpoint/Savepoint来保存状态,以便在出错时进行恢复,在上一个状态的基础上恢复计算流程。 问题 1. 如何开启Checkpoint? Flink-Checkp
问题 flink-1.13.5 用户提交FlinkSQL作业,连接Hive时发现缺少MRVersion类的定义。 java.lang.NoClassDefFoundError: org/apach
Influxdb 快速入门 从Docker启动 Influxdb docker pull influxdb:LATEST docker run -d --name influxdb -p 808
Influxdb Java客户端 Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0. Note: We recommend using the new cli
问题 Flink通过Flink-hive-connector来连接Hive,但是连接Hive报错。 具体报错是因为: HiveMetaStoreClient连接HiveMetastore 使用的T
我想定义函数 MAX_BY,它接受 T 类型的值和 Number 类型的排序参数,并根据窗口返回最大元素订购(T 类型)。我试过了 public class MaxBy extends Aggrega
问题 Flink SQL 使用Hive Dialect,同时Hive使用1.1.0-CDH5.x.x,报错: Exception in thread "main" java.l
背景 因为要开发Flinksql,决定要使用HiveCatalog的支持,Flink当前最新版本是1.12.2,集群Hive的版本是1.1.0,而且需要用某个Linux用户进行代理。 在实际开发中
问题 使用了Flink-Kafka-Connector(版本1.13.0),使用FlinkKafkaConsumer 上报了KafkaLag指标,但是换成 KafkaSource 却没有任何指标。
我是一名优秀的程序员,十分优秀!