gpt4 book ai didi

java - 如何通过 Java API 在 ElasticSearch 中重建索引

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:46:57 40 4
gpt4 key购买 nike

如标题所说...

我读了这篇文章 ( https://www.elastic.co/blog/changing-mapping-with-zero-downtime ),这个概念很棒,但我很难找到关于如何通过 JAVA API 实现它的合适引用。

我找到了这个插件:https://github.com/karussell/elasticsearch-reindex , 但似乎对我正在尝试做的事情有点矫枉过正

最佳答案

在本地一家星巴克进行一些研究后,我得出了以下结论:

假设我们已经有了索引(“old_index”)并且它有数据......现在让我们将该数据移动到我们创建的新索引(“new_index”)(可能使用不同的架构 STRING 与 INT某个字段,或者现在您决定不再希望分析或存储某个字段等)。

这里的基本思想是从现有索引(“old_index”)中检索所有数据并将其提取到新索引(“new_index”)中。但是,您需要做的事情很少:

第一步,你需要执行搜索滚动 https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

它所做的一切都比常规搜索更有效地检索结果。没有评分等。这是文档必须说的:“滚动不是为了实时用户请求,而是为了处理大量数据,例如为了将一个索引的内容重新索引到一个新索引中具有不同的配置。”

这里是关于如何使用它的 Java API 的链接:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html

Step 2. 插入时,必须使用bulk ingest。再一次,它是出于性能原因而完成的。这是批量摄取 Java API 的链接:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor

现在开始实际操作......

第 1 步。设置将从旧索引“加载”数据的滚动搜索

SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setQuery(QueryBuilders.matchAllQuery()) // Match all query
.setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll

第 2 步。设置批量处理器。

int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();

Step 3. 通过Step 1创建的scroll searcher从旧索引中读取,直到剩下mo条记录,插入到新索引中

//Scroll until no hits are returned
while (true) {
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
logger.info("Closing the bulk processor");
bulkProcessor.close();
break;
}
// Get results from a scan search and add it to bulk ingest
for (SearchHit hit: scrollResp.getHits()) {
IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
Map source = ((Map) ((Map) hit.getSource()));
request.source(source);
bulkProcessor.add(request);
}
}

第 4 步。现在是时候将指向旧索引的现有别名分配给新索引了。然后删除对旧索引的别名引用,然后删除旧索引本身。要了解如何确定分配给现有旧索引的别名,请参阅此帖子:ElasticSeach JAVA API to find aliases given index

为新索引分配别名

client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();

从旧索引中删除别名,然后删除旧索引

client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();

关于java - 如何通过 Java API 在 ElasticSearch 中重建索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31345586/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com