- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
转载:RocksDB增量模式checkpoint大小持续增长的问题及解决
Flink版本:1.13.5
一个使用FlinkSQL开发的生产线上任务, 使用Tumble Window做聚和统计,并且配置table.exec.state.ttl为7200000,设置checkpoint周期为5分钟,使用rocksdb的增量模式。
正常情况下,任务运行一段时间以后,新增和过期的状态达到动态的平衡,随着RocksDB的compaction,checkpoint的大小会在小范围内上下起伏。
实际观察到,checkpoint大小持续缓慢增长,运行20天以后,从最初了100M左右,增长到了2G,checkpoint的时间也从1秒增加到了几十秒。
源码分析
我们看一下RocksIncrementalSnapshotStrategy.RocksDBIncrementalSnapshotOperation类中的get()方法:
public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
boolean completed = false;
SnapshotResult<StreamStateHandle> metaStateHandle = null;
Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap();
HashMap miscFiles = new HashMap();
boolean var15 = false;
SnapshotResult var18;
try {
var15 = true;
metaStateHandle = this.materializeMetaData(snapshotCloseableRegistry);
Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
this.uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
synchronized(RocksIncrementalSnapshotStrategy.this.materializedSstFiles) {
RocksIncrementalSnapshotStrategy.this.materializedSstFiles.put(this.checkpointId, sstFiles.keySet());
}
IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, RocksIncrementalSnapshotStrategy.this.keyGroupRange, this.checkpointId, sstFiles, miscFiles, (StreamStateHandle)metaStateHandle.getJobManagerOwnedSnapshot());
DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
SnapshotResult snapshotResult;
if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {
IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(RocksIncrementalSnapshotStrategy.this.backendUID, this.checkpointId, directoryStateHandle, RocksIncrementalSnapshotStrategy.this.keyGroupRange, (StreamStateHandle)metaStateHandle.getTaskLocalSnapshot(), sstFiles.keySet());
snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
} else {
snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
}
completed = true;
var18 = snapshotResult;
var15 = false;
} finally {
if (var15) {
if (!completed) {
List<StateObject> statesToDiscard = new ArrayList(1 + miscFiles.size() + sstFiles.size());
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
statesToDiscard.addAll(sstFiles.values());
this.cleanupIncompleteSnapshot(statesToDiscard);
}
}
}
重点关注uploadSstFiles()方法的实现细节:
Preconditions.checkState(this.localBackupDirectory.exists());
Map<StateHandleID, Path> sstFilePaths = new HashMap();
Map<StateHandleID, Path> miscFilePaths = new HashMap();
Path[] files = this.localBackupDirectory.listDirectory();
if (files != null) {
this.createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
sstFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(sstFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
miscFiles.putAll(RocksIncrementalSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(miscFilePaths, this.checkpointStreamFactory, snapshotCloseableRegistry));
}
进入到createUploadFilePaths()方法:
private void createUploadFilePaths(Path[] files, Map<StateHandleID, StreamStateHandle> sstFiles, Map<StateHandleID, Path> sstFilePaths, Map<StateHandleID, Path> miscFilePaths) {
Path[] var5 = files;
int var6 = files.length;
for(int var7 = 0; var7 < var6; ++var7) {
Path filePath = var5[var7];
String fileName = filePath.getFileName().toString();
StateHandleID stateHandleID = new StateHandleID(fileName);
if (!fileName.endsWith(".sst")) {
miscFilePaths.put(stateHandleID, filePath);
} else {
boolean existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
if (existsAlready) {
sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
} else {
sstFilePaths.put(stateHandleID, filePath);
}
}
}
}
这里是问题的关键,我们可以归纳出主要逻辑:
这里就是增量checkpoint的关键逻辑了, 我们发现一点,增量的checkpoint只针对sst文件, 对其他的misc文件是每次全量备份的,我们进到一个目录节点看一下有哪些文件被全量备份了:
[hadoop@fsp-hadoop-1 db]$ ll
总用量 8444
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 000058.log
-rw-r--r-- 1 hadoop hadoop 2065278 3月 31 10:17 025787.sst
-rw-r--r-- 1 hadoop hadoop 1945453 3月 31 10:18 025789.sst
-rw-r--r-- 1 hadoop hadoop 75420 3月 31 10:18 025790.sst
-rw-r--r-- 1 hadoop hadoop 33545 3月 31 10:18 025791.sst
-rw-r--r-- 1 hadoop hadoop 40177 3月 31 10:18 025792.sst
-rw-r--r-- 1 hadoop hadoop 33661 3月 31 10:18 025793.sst
-rw-r--r-- 1 hadoop hadoop 40494 3月 31 10:19 025794.sst
-rw-r--r-- 1 hadoop hadoop 33846 3月 31 10:19 025795.sst
-rw-r--r-- 1 hadoop hadoop 16 3月 30 19:46 CURRENT
-rw-r--r-- 1 hadoop hadoop 37 3月 28 14:56 IDENTITY
-rw-r--r-- 1 hadoop hadoop 0 3月 28 14:56 LOCK
-rw-rw-r-- 1 hadoop hadoop 38967 3月 28 14:56 LOG
-rw-r--r-- 1 hadoop hadoop 1399964 3月 31 10:19 MANIFEST-022789
-rw-r--r-- 1 hadoop hadoop 10407 3月 28 14:56 OPTIONS-000010
-rw-r--r-- 1 hadoop hadoop 13126 3月 28 14:56 OPTIONS-000012
在增量checkpoint过程中,虽然sst文件所保存的状态数据大小保持动态平衡,但是LOG日志和MANIFEST文件仍然会当向持续增长,所以checkpoint会越来越大,越来越慢。
我在 eventhub 中遇到了 Blob 存储检查点问题。如果我在获取消费者客户端时没有设置 checkpoint_store,我的应用程序运行正常。每当我尝试设置 checkpoint_store
当它说时,辅助名称节点检查点每小时(fs.checkpoint.period 以秒为单位)或如果编辑日志已达到 64 MB(fs.checkpoint.size 以字节为单位)则更早?究竟是什么意思?
我正在运行 PostgreSQL 服务器并将 shared_buffers 限制为 4GB。 当我在数据库中插入大量记录时,检查点进程开始消耗 RAM。即使在一天后,此过程既不会结束也不会减少 RAM
我已经用 sc.setCheckpointDir 设置了检查点目录方法。 /checkpointDirectory/ 然后我创建了一个 rdd 的检查点:rdd.checkpoint()在目录中,我现
我建立了自己的卷积神经网络,在其中跟踪所有可训练变量的移动平均值(tensorflow 1.0): variable_averages = tf.train.ExponentialMovingAver
我们有一个强大的 Postgres 服务器(64 核,384 GB RAM,16 个 15k SAS 驱动器,RAID 10),并且在一天中我们多次重建几个写入密集型的大型数据集。 Apache 和
我需要以编程方式获取不依赖于目录列表和文件扩展验证的现有检查点列表,如果您键入: tf.train.get_checkpoint_state('checkpoints') 您可以看到已打印此列表,但我
我一直在到处寻找这个问题的答案,但无济于事。我希望能够运行我的代码并将变量存储在内存中,以便我可以设置一个“检查点”,我可以在将来运行它。原因是我有一个相当昂贵的函数,它需要一些时间来计算(以及用户输
作为我的问题的序言,让我提供一些背景信息:我目前正在研究一个包含许多不同步骤的数据管道。每一步都可能出错,而且很多都需要一些时间(不是很多,但在几分钟的数量级)。 因此,管道目前受到人工的严格监督。分
本文整理了Java中org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore类的一些代码示例,展示了ZooKeeper
本文整理了Java中org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter类的一些代码示例,展示了ZooKeeperCheck
本文整理了Java中org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory类的一些代码示例,展示了ZooKeepe
根据 this question和我读过的文档,Spark Streaming 的 foreachRDD(someFunction) 将让 someFunction 本身仅在驱动程序进程中执行,但如果
我正在使用简单的猫与狗数据集在 Google Colab 上尝试新添加的 TPU 支持。 在创建了一个简单的 CNN 之后,我尝试将模型导出到 TPU。但它因错误而失败 TypeError: Chec
我正在尝试在tensorflow-serving中使用重新训练的inception-v3模型。但看来我必须提供一个“检查点”。我想知道如何获得这些“检查点”? retrain.py 返回一个 retr
所以我有一个基于 census tutorial 的 ML 引擎包我尝试使用 --min-eval-Frequency 标志每 N 个步骤执行一次评估,但我不断在 stackdriver 日志中收到消
我可以通过以下方式在检查点( https://www.tensorflow.org/beta/guide/checkpoints#manually_inspecting_checkpoints )中保
我刚刚在 IndoorAtlas 上进行了分析,并使用 ios SDK 尝试了示例应用程序。当我四处走动时,我在创建的平面图中更新了我的位置。 我想知道当我到达我在楼层内创建检查点的位置时应该如何接收
我正在训练一个 tensorflow 模型,在每个 epoch 之后我保存模型状态并 pickle 一些数组。到目前为止,我的模型做了 2 个时期,保存状态的文件夹包含以下文件: checkpoint
您好,我正在尝试运行一个经常因 StackoverflowError 而失败的长 sparkjob。该作业读取一个 parquetfile 并在 foreach 循环中创建一个 rdd。在做了一些研究
我是一名优秀的程序员,十分优秀!