gpt4 book ai didi

elasticsearch升级和索引重建。

转载 作者:我是一只小鸟 更新时间:2023-04-15 22:31:38 36 4
gpt4 key购买 nike

1.背景描述

  2020年团队决定对elasticsearch升级。es(elasticsearch缩写,下同)当前版本为0.9x,升级到5.x版本。es在本公司承载三个部分的业务,站内查询,订单数据统计,elk日志分析.

  对于站内查询和订单数据统计,当前业务架构是 。

  mysql -> canal -> kafka -> es 。

  (可以考虑使用kafka connector 代替canal) 。

2.难点

  难点是在升级的时候如何不影响当前业务.

3.具体步骤

A.部署es新集群

  下载5.x版本的es,在新的机器上部署新的集群.

B.pull代码,升级代码到es新版本

  由于从0.9x到5.x版本跨度比较大,许多java api都发生了变化,需要修复.

  一个坑是alias api 发生了语义变化,在后来的自测中修复了此问题.

C.重建索引

  我们使用索引重建程序来新建索引。重建索引具体步骤如下,我们称线上索引为online index, 新创建的索引为new index.

  1.init 。

    刷新索引名映射关系,检查当前alias只有一个物理索引.

    根据预定义的mapping,创建索引new index.

    设置在线索引记录数据变更日志,即记录线上索引消费kafka数据,并存储为change log文件. 。

  2.全量索引数据库上的数据到new index 。

    从mysql查出数据同步到es中,如果有多个分表,就按照表顺序同步。可以开启多线程批量插入.

  3.对new index索引优化 。

    refresh, flush 索引。调用force-merge api,进行段合并.

  4.重放change log到new index中 。

    根据change log 转换为es query,写入到new index。     。

  5.暂停线上索引的写入 。

    因为online index和new index 使用的是相同的kafka consumer group,所以必须停掉online index的消费功能.

  6.关闭change log 。

    停止记录在线索引记录数据变更日志.

  7.第二阶段重放change log 。

    根据change log 转换为es query,写入到new index。  。

  8.删除change log  。

    删除线索引记录数据变更日志.

  9.设置副本数  。

    new index创建索引的时候默认副本数为0,现在动态调整副本数为业务需要的值。比如对现实搜索业务设置两个副本,对订单统计类索引不需要副本.

                      
                        PUT /new_index/_settings
{
    "number_of_replicas": 2
}
                      
                    

    此阶段可能会比较耗时,需要等待几分钟才能进行下一步操作。更好的做法是调用health api 查看分片状态.

                      
                        GET _cluster/health

{
  "cluster_name" : "testcluster",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0, // 重新定位的分片
  "initializing_shards" : 0, // 初始化中的分片
  "unassigned_shards" : 1, // 未分配的分片
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50.0
}
                      
                    

  10.别名切换  。

                      
                        POST /_aliases
{
    "actions": [
        { "remove": { "index": "online_index", "alias": "my_index" }},
        { "add":    { "index": "new_index", "alias": "my_index" }}
    ]
}
                      
                    

  11.运行在线索引 (从kafka里面读取数据) 。

    new_index 开始从kafka里面消费最新数据。由于之前的操作可能会有延时,需要等待几分钟才能同步到最新数据.

  12.删除旧的索引 。

    删除old_index 。

详细代码步骤如下 。

                      
                                // 1.init
        logger.info("初始化");
        ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
        logger.info("刷新索引名映射关系");
        if (!indexContext.refreshIndexName()) {
            throw new IndexException("刷新索引映射关系失败");
        }

        rebuildIndexName = indexContext.getPhysicalRebuildIndexName();

        logger.info("初始化重建索引环境,当前重建索引名:" + rebuildIndexName);
        logger.info("创建索引,索引名:" + rebuildIndexName);
        boolean isCreate = false;
        try {
            isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
        } catch (Throwable t) {
            logger.info("创建索引失败,本次失败可以不处理,将会自动重试 ...");
        }

        logger.info("设置在线索引记录数据变更日志");
        indexContext.startChangeLog();

        // 2. 重建索引
        logger.info("全量索引数据库上的数据 ...");
        long startRebulidTime = System.currentTimeMillis();
        rebuild();
        logger.info(" ------  完成全量索引数据库上的数据,对应索引" + rebuildIndexName + ",耗时" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
            + " 秒    ------  ");

        // 3. 索引优化 -- 是否调到变更重放完毕后做优化
        logger.info("优化索引 ...");
        long startOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
        logger.info(" ------  完成" + rebuildIndexName + "索引优化,耗时 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
            + " 秒    ------  ");

        // TODO 字符集设置
        BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));

        // 4. 重放变更日志
        logger.info("重放本地数据变更日志[第一阶段] ...");
        long startReplay1Time = System.currentTimeMillis();
        int replayChangeLogCount = replayChangeLogFirst(logReader);
        logger.info(" ------  完成[第一阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
            + ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒    ------  ");

        // 5. 暂停在线索引
        logger.info("暂停在线索引");
        indexContext.pauseOnlineIndex();
        isPauseOnline.set(true);

        // 6. 设置 在线索引只做索引更新 以及 关闭 change log
        logger.info("停止变更日志");
        indexContext.stopChangeLog();

        // 7. 继续重放 change log
        logger.info("重放本地数据变更日志[第二阶段] ...");
        long startReplay2Time = System.currentTimeMillis();
        replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
        if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
            logger.error("变更日志,处于错误的状态,统计的日志行数:" + indexContext.getWriteChangeLogCount() + ", 但实际只有:" + replayChangeLogCount);
        }
        logger.info(" ------  完成[第二阶段]的变更日志重放,行数" + replayChangeLogCount + " 耗时 "
            + ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒    ------  ");

        // 8. 删除变更日志, OnlineIndex.startChangeLog 有做环境清理,这里不执行
        logger.info("简单优化索引 ...");
        long startSimpleOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);

        logger.info(" ------  完成" + rebuildIndexName + "索引简单优化,耗时 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
            + " 秒    ------  ");

        // 9. 设置副本数 (怀疑比较耗时~~~待确认)
        logger.info("设置副本数 ...");
        int replicas = 3;
        if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
            replicas = 1;
        } else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
            replicas = 2;
        } else {
            String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
            if (NumberUtils.isNumber(replicasStr)) {
                replicas = NumberUtils.toInt(replicasStr);
            }
        }
        ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);

        // 执行索引切换流程
        // 预发、线上环境阻塞等待2分钟同步数据后,再执行索引切换和删除旧索引逻辑
        try {
            if(IDCUtil.isBuildOrProduction()){
                Thread.sleep(120 * 1000);
            }
        } catch (InterruptedException e) {
        }
        // 10. 别名切换
        logger.info("索引切换:将" + rebuildIndexName + "设置为线上索引");
        if (!indexContext.switchIndex(rebuildIndexName)) {
            throw new IndexException("索引切换失败:将" + rebuildIndexName + "设置为线上索引失败");
        }

        // 11. 运行在线索引
        logger.info("运行在线索引");
        indexContext.keepRuningOnlineIndex();
        isPauseOnline.set(false);

        // 12. 删除原有在线索引
        String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
        logger.info("删除原有在线索引,索引名:" + oldOnlineIndexName);
        if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
            throw new IndexException("删除索引失败,索引名:" + oldOnlineIndexName);
        }
                      
                    

思考 。

如果只是简单地新建索引,完全可以这样做(使用不同的消费组)  。

  1.记录时间戳  。

  2.全量索引数据的数据 。

  3.根据前面的时间戳找到kafka中的下标,下标得时间戳必须 < 记录的时间戳 。

  4.根据上一步的下标开始索引数据 。

D.使用新集群进行业务测试

  部署新的客户端服务调用新的es集群,检查业务是否正常。对站内查询检查搜索结果是否一致,对统计类查询查看统计结果是否一致.

E.发布线上客户端搜索代码,修改es地址为新集群地址

  上线,观察业务是否稳定.

F.下线旧的 es集群

  释放旧的es集群的资源.

4.总结

  es升级这份工作是两年之前做的,现在来进行总结,部分细节可能会有疏漏。但是总结起来,依然后很多收获,从架构,代码细节上都有改进的空间。es重建代码可以做得更通用,然后开源出来.

最后此篇关于elasticsearch升级和索引重建。的文章就讲到这里了,如果你想了解更多关于elasticsearch升级和索引重建。的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com