- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
2020年团队决定对elasticsearch升级。es(elasticsearch缩写,下同)当前版本为0.9x,升级到5.x版本。es在本公司承载三个部分的业务,站内查询,订单数据统计,elk日志分析.
对于站内查询和订单数据统计,当前业务架构是 。
mysql -> canal -> kafka -> es 。
(可以考虑使用kafka connector 代替canal) 。
难点是在升级的时候如何不影响当前业务.
下载5.x版本的es,在新的机器上部署新的集群.
由于从0.9x到5.x版本跨度比较大,许多java api都发生了变化,需要修复.
一个坑是alias api 发生了语义变化,在后来的自测中修复了此问题.
我们使用索引重建程序来新建索引。重建索引具体步骤如下,我们称线上索引为online index, 新创建的索引为new index.
1.init 。
刷新索引名映射关系,检查当前alias只有一个物理索引.
根据预定义的mapping,创建索引new index.
设置在线索引记录数据变更日志,即记录线上索引消费kafka数据,并存储为change log文件. 。
2.全量索引数据库上的数据到new index 。
从mysql查出数据同步到es中,如果有多个分表,就按照表顺序同步。可以开启多线程批量插入.
3.对new index索引优化 。
refresh, flush 索引。调用force-merge api,进行段合并.
4.重放change log到new index中 。
根据change log 转换为es query,写入到new index。 。
5.暂停线上索引的写入 。
因为online index和new index 使用的是相同的kafka consumer group,所以必须停掉online index的消费功能.
6.关闭change log 。
停止记录在线索引记录数据变更日志.
7.第二阶段重放change log 。
根据change log 转换为es query,写入到new index。 。
8.删除change log 。
删除线索引记录数据变更日志.
9.设置副本数 。
new index创建索引的时候默认副本数为0,现在动态调整副本数为业务需要的值。比如对现实搜索业务设置两个副本,对订单统计类索引不需要副本.
PUT /new_index/_settings
{
"number_of_replicas": 2
}
此阶段可能会比较耗时,需要等待几分钟才能进行下一步操作。更好的做法是调用health api 查看分片状态.
GET _cluster/health
{
"cluster_name" : "testcluster",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 1,
"active_shards" : 1,
"relocating_shards" : 0, // 重新定位的分片
"initializing_shards" : 0, // 初始化中的分片
"unassigned_shards" : 1, // 未分配的分片
"delayed_unassigned_shards": 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50.0
}
10.别名切换 。
POST /_aliases
{
"actions": [
{ "remove": { "index": "online_index", "alias": "my_index" }},
{ "add": { "index": "new_index", "alias": "my_index" }}
]
}
11.运行在线索引 (从kafka里面读取数据) 。
new_index 开始从kafka里面消费最新数据。由于之前的操作可能会有延时,需要等待几分钟才能同步到最新数据.
12.删除旧的索引 。
删除old_index 。
详细代码步骤如下 。
// 1.init
logger.info("初始化");
ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
logger.info("刷新索引名映射关系");
if (!indexContext.refreshIndexName()) {
throw new IndexException("刷新索引映射关系失败");
}
rebuildIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("初始化重建索引环境,当前重建索引名:" + rebuildIndexName);
logger.info("创建索引,索引名:" + rebuildIndexName);
boolean isCreate = false;
try {
isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
} catch (Throwable t) {
logger.info("创建索引失败,本次失败可以不处理,将会自动重试 ...");
}
logger.info("设置在线索引记录数据变更日志");
indexContext.startChangeLog();
// 2. 重建索引
logger.info("全量索引数据库上的数据 ...");
long startRebulidTime = System.currentTimeMillis();
rebuild();
logger.info(" ------ 完成全量索引数据库上的数据,对应索引" + rebuildIndexName + ",耗时" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
+ " 秒 ------ ");
// 3. 索引优化 -- 是否调到变更重放完毕后做优化
logger.info("优化索引 ...");
long startOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
logger.info(" ------ 完成" + rebuildIndexName + "索引优化,耗时 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
+ " 秒 ------ ");
// TODO 字符集设置
BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));
// 4. 重放变更日志
logger.info("重放本地数据变更日志[第一阶段] ...");
long startReplay1Time = System.currentTimeMillis();
int replayChangeLogCount = replayChangeLogFirst(logReader);
logger.info(" ------ 完成[第一阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
+ ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒 ------ ");
// 5. 暂停在线索引
logger.info("暂停在线索引");
indexContext.pauseOnlineIndex();
isPauseOnline.set(true);
// 6. 设置 在线索引只做索引更新 以及 关闭 change log
logger.info("停止变更日志");
indexContext.stopChangeLog();
// 7. 继续重放 change log
logger.info("重放本地数据变更日志[第二阶段] ...");
long startReplay2Time = System.currentTimeMillis();
replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
logger.error("变更日志,处于错误的状态,统计的日志行数:" + indexContext.getWriteChangeLogCount() + ", 但实际只有:" + replayChangeLogCount);
}
logger.info(" ------ 完成[第二阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
+ ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒 ------ ");
// 8. 删除变更日志, OnlineIndex.startChangeLog 有做环境清理,这里不执行
logger.info("简单优化索引 ...");
long startSimpleOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);
logger.info(" ------ 完成" + rebuildIndexName + "索引简单优化,耗时 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
+ " 秒 ------ ");
// 9. 设置副本数 (怀疑比较耗时~~~待确认)
logger.info("设置副本数 ...");
int replicas = 3;
if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
replicas = 1;
} else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
replicas = 2;
} else {
String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
if (NumberUtils.isNumber(replicasStr)) {
replicas = NumberUtils.toInt(replicasStr);
}
}
ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);
// 执行索引切换流程
// 预发、线上环境阻塞等待2分钟同步数据后,再执行索引切换和删除旧索引逻辑
try {
if(IDCUtil.isBuildOrProduction()){
Thread.sleep(120 * 1000);
}
} catch (InterruptedException e) {
}
// 10. 别名切换
logger.info("索引切换:将" + rebuildIndexName + "设置为线上索引");
if (!indexContext.switchIndex(rebuildIndexName)) {
throw new IndexException("索引切换失败:将" + rebuildIndexName + "设置为线上索引失败");
}
// 11. 运行在线索引
logger.info("运行在线索引");
indexContext.keepRuningOnlineIndex();
isPauseOnline.set(false);
// 12. 删除原有在线索引
String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("删除原有在线索引,索引名:" + oldOnlineIndexName);
if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
throw new IndexException("删除索引失败,索引名:" + oldOnlineIndexName);
}
思考 。
如果只是简单地新建索引,完全可以这样做(使用不同的消费组) 。
1.记录时间戳 。
2.全量索引数据的数据 。
3.根据前面的时间戳找到kafka中的下标,下标得时间戳必须 < 记录的时间戳 。
4.根据上一步的下标开始索引数据 。
部署新的客户端服务调用新的es集群,检查业务是否正常。对站内查询检查搜索结果是否一致,对统计类查询查看统计结果是否一致.
上线,观察业务是否稳定.
释放旧的es集群的资源.
es升级这份工作是两年之前做的,现在来进行总结,部分细节可能会有疏漏。但是总结起来,依然后很多收获,从架构,代码细节上都有改进的空间。es重建代码可以做得更通用,然后开源出来.
最后此篇关于elasticsearch升级和索引重建。的文章就讲到这里了,如果你想了解更多关于elasticsearch升级和索引重建。的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我几乎不敢在这里问这个问题,因为它似乎应该很容易通过谷歌或 ravendb.net 获得。但是,我一直很难找到将我的 RavenDB 升级到新版本的正确方法。我目前正在运行 573 版并希望升级到 6
这周我需要升级当前版本的 DNN。我目前使用的是 2.1.1。我不想每件事都做两次,所以,我有几个问题。 是否有升级工具或某些脚本可以帮助我进行升级。 我最好安装 4.9 还是 5.0。这是生产。 如
将 Sugarcrm 从 6.2 升级到 6.3 版本时遇到问题。当我升级本地 Sugarcrm 安装时,它可以工作,但是当我开始升级我的 Sugarcrm 6.2 站点并上传升级包时,它不会上传。
有没有办法绕过 Meteor 的自动更新功能?我坚持 Downloading meteor-tool@1.3.0_3... \ 当我尝试运行现有项目,或创建一个新项目或只是运行“
我已将应用内集成到我的 Andorid 应用程序中,用于单个产品 productone。 为此,我在我的 Google Play 控制台中创建了不同的产品 ID,如下所示: 1。 productone
我在将 TeamCity 版本 2017.1.1 升级到 2017.1.2 时遇到问题。这个问题涉及 TeamCity 和 PostgreSQL 的工作。我的工作: 停止 teamcity 进程 /e
我寻找了这个问题的具体答案,但找不到——即使是在 WAMPSERVER 网站上也是如此。我确定我忽略了它。 我有 Wampserver 2.0、MySQL 5.0.51b、PHP 5.2.6 和 Ap
我使用 Ubuntu 软件中心默认的 Eclipse 3.7。 我想将 Eclipse 升级到 kepler 版本,所以我添加了 repository 我收到以下错误消息: Cannot comple
你好 我只想安装 mercurial,但对于它需要 python 2.6 的所有版本,我尝试使用 .rpm 文件,但我唯一得到的是很多充满错误的行,它告诉我:需要安装在 2.6 之前和 2.5 之后的
我完全知道 Gradle 网站上有一些页面说明了如何升级,但仅限于 4.x 及更高版本。 我正在尝试关注 tutorial制作一个简单的“我的第一个”Minecraft 模组。在其中,您被告知安装 f
我们想升级 Kerberos(服务器和客户端) 当前:1.6.3-133.27.1 目标:1.6.3-133.49.97.1 问题是如果我们用包管理器升级它,下面会发生什么? KDC 数据库 所有主要
背景 原计划 2019 年发布的 Vue3,又经过一年的再次打磨,终于于去年 9 月正式发布。随后,不少 UI 组件库都积极参与适配,去年 12 月,Element-plus(
我有一个版本为 2.3.4 的 grails 项目,我需要尽可能升级到最新版本。查看文档我意识到从 2.x 到 3.x 有巨大的变化。 问题是:从 2 到 3、从 3 到 4、从 4 到 5 逐步升级
我正在将 API 项目从 .net5 升级到 .net6 它以前工作,现在它崩溃 内部异常消息“抛出了‘Unity.Exceptions.InvalidRegistrationException’类型
我将我的项目从 expo 44 升级到 expo 45,现在我有无数这样的错误: The module 'MaterialIcons' can't be used as JSX component.
我已经升级了掌 Helm 模板(手动) 以前的片段depoloyment.yaml : apiVersion: apps/v1beta2 kind: Deployment metadata: na
我正在尝试将我的 Scala Play Framework 应用程序升级到 2.8,这涉及将 SBT 升级到 1.x。 在我的 build.propeties 我有 sbt.version=1.3.5
我想在我的 Windows 服务器上安装 PHPUnit 3.7。我遵循了各种说明 here并以 PHPUnit 3.4.1 结束。当我尝试使用以下方法再次安装它时: pear update chan
Microsoft.Net 4.5 即将推出,我想在 MS 发布最终版本时升级我的 clickonce 应用程序。 我的问题是:已经安装了 clickonce 应用程序(使用 .net 4.0)的用户
为了将 Angular 8 更新到 9,我正在按照官方文档升级。 这建议首先更新到最新版本的 angular 8,例如: ng update @angular/core@8 @angular/cli@
我是一名优秀的程序员,十分优秀!