- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
转载:RocksDB增量模式checkpoint大小持续增长的问题及解决
Flink版本:1.13.5
一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。
正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。
实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。
源码分析
我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
boolean completed = false;
SnapshotResult<StreamStateHandle> metaStateHandle = null;
Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
HashMap miscFiles = new HashMap();
boolean var15 = false;
SnapshotResult var18;
try {
var15 = true;
metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
}
IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
SnapshotResult snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
var18 = snapshotResult;
var15 = false;
} finally {
if (var15) {
if (!completed) {
List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
this.cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
重点关注uploadSstFiles()方法的实现细节:
Preconditions.checkState(this.localBackupDirectory.exists());
Map<StateHandleID, Path> sstFilePaths = new HashMap();
Map<StateHandleID, Path> miscFilePaths = new HashMap();
Path[] files = this.localBackupDirectory.listDirectory();
if (files != null) {
this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
}
进入到createUploadFilePaths()方法:
private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
Path[] var5 = files;
int var6 = files.length;
for(int var7 = 0; var7 < var6; ++var7) {
Path filePath = var5[var7];
String fileName = filePath.getFileName().toString();
StateHandleID stateHandleID = new StateHandleID(fileName);
if (!fileName.endsWith(".sst")) {
miscFilePaths.put(stateHandleID, filePath);
} else {
boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
if (existsAlready) {
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
sstFilePaths.put(stateHandleID, filePath);
}
}
}
}
这里是问题的关键,我们可以归纳出主要逻辑:
这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:
[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月 31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月 31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop 75420 3月 31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop 33545 3月 31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop 40177 3月 31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop 33661 3月 31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop 40494 3月 31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop 33846 3月 31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop 16 3月 30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop 37 3月 28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop 38967 3月 28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月 31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop 10407 3月 28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop 13126 3月 28 14:56 OPTIONS-000012
在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。
@Cacheable在同一类中方法调用无效 上述图片中,同一个类中genLiveBullets()方法调用同类中的queryLiveByRoom()方法,这样即便标识了Cacheable标签,
目录 @Transaction注解导致动态切换更改数据库失效 使用场景 遇到问题 解决 @Transaction
@RequestBody不能class类型匹配 在首次第一次尝试使用@RequestBody注解 开始加载字符串使用post提交(貌似只能post),加Json数据格式传输的时候,
目录 @Autowired注入static接口问题 @Autowired自动注入普通service很方便 但是如果注入static修饰的serv
目录 @RequestBody部分属性丢失 问题描述 JavaBean实现 Controller实现
目录 解决@PathVariable参数接收不完整的问题 今天遇到的问题是: 解决办法: @PathVariable接受的参
这几天在项目里面发现我使用@Transactional注解事务之后,抛了异常居然不回滚。后来终于找到了原因。 如果你也出现了这种情况,可以从下面开始排查。 1、特性 先来了解一下@Trans
概述: ? 1
场景: 在处理定时任务时,由于这几个方法都是静态方法,在aop的切面中使用@Around注解,进行监控方法调用是否有异常。 发现aop没有生效。 代码如下:
最近做项目的时候 用户提出要上传大图片 一张图片有可能十几兆 本来用的第三方的上传控件 有限制图片上传大小的设置 以前设置的是2M&nb
我已经实现了这个SCIM reference code在我们的应用程序中。 我实现的代码确实通过了此postman link中存在的所有用户测试集合。 。我的 SCIM Api 也被 Azure 接受
我一直对“然后”不被等待的行为感到困扰,我明白其原因。然而,我仍然需要绕过它。这是我的用例。 doWork(family) { return doWork1(family)
我正在尝试查找 channel 中的消息是否仍然存在,但是,我不确定如何解决 promise ,查看其他答案和文档,我可以看到它可能是通过函数实现的,但我是不完全确定如何去做。我希望能在这方面获得一些
我有以下情况: 同一工作区中的 2 个 Eclipse 项目:Apa 和 Bepa(为简洁起见,使用化名)。 Apa 项目引用(包括)Bepa 项目。 我在 Bepa 有一个类 X,具有公共(publ
这个问题已经有答案了: Why am I getting a NoClassDefFoundError in Java? (31 个回答) 已关闭 6 年前。 我正在努力学习 spring。所以我输入
我正在写一个小游戏,屏幕上有许多圆圈在移动。 我在两个线程中管理圈子,如下所示: public void run() { int stepCount = 0; int dx;
我在使用 Sympy 求解方程时遇到问题。当我运行代码时,例如: 打印(校正(10)) 我希望它打印一个数字 f。相反,它给我错误:执行中止。 def correction(r): from
好吧,我制作的每个页面都有这个问题。我不确定我做错了什么,但我所有的页面都不适用于所有分辨率。可能是因为我使用的是宽屏?大声笑我不确定,但在小于宽屏分辨率的情况下,它永远不会看起来正确。它的某些部分你
我正在尝试像这样进行一个非常简单的文化 srting 检查 if(culture.ToUpper() == "ES-ES" || "IT-IT") { //do something } else
Closed. This question is off-topic. It is not currently accepting answers. Learn more。 想改进这个问题吗?Upda
我是一名优秀的程序员,十分优秀!