gpt4 book ai didi

用于在 ElasticSearch 中索引文档的 Java ThreadPoolExecutor

转载 作者:行者123 更新时间:2023-11-30 05:58:24 25 4
gpt4 key购买 nike

我是 Java ThreaPoolExecutor 的新手,我编写了一些任务来在 Elasticsearch 中索引文档。通过 ThreaPoolExecutor 正在执行该任务并且工作正常。

但是,我仍然不太确定我的方法。

请在下面找到我的代码

public class IndexApp {

public static void main(String[] args)
{
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
Map<String, Object> jsonMap ;

System.out.println("Indexing via Java Code ....");
Product prod1=new Product("1001", 123172l, "Product", "VG3000");
Product prod2=new Product("1002", 123172l, "Series", "Valves, VG3000");
Product prod3=new Product("1003", 3536633, "Series", "Activa RoofTop, VG3000 karthikeyan ");
Product prod4=new Product("1004", 123172l, "Product", "Activa RoofTop VG3000, 3000");

List<Product> objList=new ArrayList<Product>();
objList.add(prod1);
objList.add(prod2);
objList.add(prod3);
objList.add(prod4);

for(int i=0;i<objList.size();i++)
{
jsonMap = new HashMap<String, Object>();
jsonMap.put("id", objList.get(i).getId());
jsonMap.put("catalog_id", objList.get(i).getCatalog_id());
jsonMap.put("catalog_type", objList.get(i).getCatalog_type());
jsonMap.put("values", objList.get(i).getValues());
IndexTask task = new IndexTask(jsonMap);
executor.execute(task);
}
executor.shutdown();
}

}


public class IndexTask implements Runnable {

private final static String INDEX_NAME = "index_prod";

Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;

public IndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}

public Map<String, Object> getJsonMap() {
return jsonMap;
}

public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);

try {
IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

有人让我知道我的方法对于 Elasticsearch 中的文档索引完全有意义吗?

更新2

请找到我修改后的代码。

我没有使用IndexRequest,而是使用了BulkRequest

public class ProdCatIndexTask implements Runnable {

private final static String INDEX_NAME = "productcatalog_index";

Product prod=new Product();
IndexRequest request;
Map<String, Object> jsonMap ;

BulkRequest bulkRequest = new BulkRequest();

public ProdCatIndexTask(Map<String, Object> jsonMap ) {
this.jsonMap = jsonMap;
}

public Map<String, Object> getJsonMap() {
return jsonMap;
}

public void run() {
try {
Long duration = (long) (Math.random() * 10);
System.out.println("Executing : "+jsonMap.get("id")+" Sleep Duration : "+duration );

/*request = new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString() )
.source(jsonMap);*/

bulkRequest.add( new IndexRequest(INDEX_NAME, "doc", jsonMap.get("id").toString()).source(jsonMap));

try {
//IndexResponse response = SearchEngineClient.getInstance3().index(request); // increased timeout
BulkResponse bulkResponse = SearchEngineClient.getInstance3().bulk(bulkRequest);
System.out.println("Triggered Bulk Request.....");
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

最佳答案

如果您想以批量模式并行加载数据,我建议使用 ElasticSearch API BulkProcessor

这里是https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html .

如何使用bulkProcessor的示例:

bulkProcessor.add(new IndexRequest("indexName", "type")
.source(toJson(Product), XContentType.JSON);

如果你想更快,你可以将副本数量减少到 0,并让 ElasticSearch 生成 ID,因为如果你索引自己的 ID,每次 ElasticSearch 都会检查该 ID 是否存在于 ElasticSearch 中。

有关如何提高加载性能的其他想法:

https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html

关于用于在 ElasticSearch 中索引文档的 Java ThreadPoolExecutor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52737101/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com