gpt4 book ai didi

elasticsearch - 如何在不使用logstash的情况下将数据从csv导入java中的elasticsearch?

转载 作者:行者123 更新时间:2023-12-03 00:00:44 24 4
gpt4 key购买 nike

我想将数据从csv文件导入到elasticsearch。但是我不想使用logstatsh。那么,我可以通过哪些方式做到这一点?
有博客吗?文件?
我遇到了TransportClient,但是我不知道从哪里开始。
提前致谢。

最佳答案

但是,答案很晚:)…。这是针对elasticsearch 7.6.0

//this class for keeping csv each row values
public class Document {
private String id;
private String documentName;
private String name;
private String title;
private String dob;

public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDocumentName() {
return documentName;
}
public void setDocumentName(String documentName) {
this.documentName = documentName;
}
public String getName() {
return name1;
}
public void setName(String name1) {
this.name1 = name1;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getDob() {
return dob;
}
public void setDob(String dob) {
this.dob = dob;
}
}




public void bulkInsert() {
long starttime = System.currentTimeMillis();
logger.debug("ElasticSearchServiceImpl => bulkInsert Service Started");
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
BulkRequest request;
Document document;
//elastic Search Index Name
String esIndex = "post";
try {
br = new BufferedReader(new FileReader(<path to CSV>));
request = new BulkRequest();
while ((line = br.readLine()) != null) {

// use comma as separator
String[] row = line.split(cvsSplitBy);
if(row.length >= 1) {
//filling Document object using csv columns array
document = getDocEntity(row);
//adding each filled obect into BulkRequest
request.add(getIndexRequest(document, esIndex));
} else {
logger.info("ElasticSearchServiceImpl => bulkInsert : null row ="+row.toString());
}
}
br.close();
if(request.numberOfActions()>0) {
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
logger.error("ElasticSearchServiceImpl => bulkInsert : Some of the record has failed.Please reinitiate the process");
} else {
logger.info("ElasticSearchServiceImpl => bulkInsert : Success");
}
} else {
logger.info("ElasticSearchServiceImpl => bulkInsert : No request for BulkInsert ="+request.numberOfActions());
}

} catch (Exception e) {
logger.error("ElasticSearchServiceImpl => bulkInsert : Exception =" + e.getMessage());
}
long endTime = System.currentTimeMillis();
logger.info("ElasticSearchServiceImpl => bulkInsert End" + Util.DB_AVG_RESP_LOG + "" + (endTime - starttime));
}


public static Document getDocEntity(String[] row)throws Exception {
Document document = new Document();
document.setId(UUID.randomUUID().toString());
for(int i=0;i<row.length;i++) {
switch (i) {
case 0:
document.setDocumentName(row[i]);
break;
case 1:
document.setName(row[i]);
break;
case 7:
document.setTitle(row[i]);
break;
case 8:
document.setDob(row[i]);
break;
}

return document;
}


public static IndexRequest getIndexRequest(Document document,String index)throws Exception {
IndexRequest indexRequest = null;

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("doc_name",document.getDocumentName());
jsonMap.put("title",document.getTitle());
jsonMap.put("dob",document.getDob());

indexRequest = new IndexRequest(index).id(document.getId()).source(jsonMap);
return indexRequest;
}

如果您需要显示每个回复,则可以使用以下代码进行回复
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
DocWriteResponse itemResponse = bulkItemResponse.getResponse();

switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
}
}

有关更多信息,请阅读官方 link

关于elasticsearch - 如何在不使用logstash的情况下将数据从csv导入java中的elasticsearch?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49362326/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com