gpt4 book ai didi

java - 使用 JavaAPI 将 CSV 文件批量上传到 Elasticsearch

转载 作者:行者123 更新时间:2023-12-01 16:18:57 30 4
gpt4 key购买 nike

我想使用JAVA API(不使用logstash)将csv文件批量上传到Elasticsearch。

Elasticsearch 版本 - 6.6

我已经尝试使用 Jackson 格式的以下程序来获取 IndexRequest 的源映射。因为我无法预定义 POJO 变量。所以我使用了 CSV 文件中的动态 map

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;

import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.zoho.dedupe.connection.DukeESConnection;

public class BulkImport {

private static Logger logger = Logger.getLogger(BulkImport.class.getName());


public static void main(String args[]) {
long starttime = System.currentTimeMillis();
logger.info("ElasticSearchServiceImpl => bulkInsert Service Started");
FileInputStream fis = null;

BulkRequest request;
RestHighLevelClient client;
//elastic Search Index Name
String esIndex = "post";
try {
boolean isHeaderSet = false;
Set<String> header = new HashSet<String>();

fis = new FileInputStream("/Users/test/Documents/Test.csv");
request = new BulkRequest();
MappingIterator<Map<String, Object>> data = parse(fis);
while (data.hasNext()) {
Map<?,?> value = data.next();
if(!isHeaderSet) {
header.add("id");
header = (Set<String>) value.keySet();
isHeaderSet= true;
}
System.out.println(value);
request.add(getIndexRequest(value, esIndex));
}
fis.close();

if(request.numberOfActions()>0) {
String hostsInString = "localhost";
List<HttpHost> httpHosts = new ArrayList<HttpHost> ( );
String[] hosts = hostsInString.split (",");
for (String host : hosts)
{
HttpHost httpHost = new HttpHost (host, 9200, "http");
httpHosts.add (httpHost);
}
client = client = new RestHighLevelClient (RestClient.builder (
httpHosts.toArray(new HttpHost[]{})).setMaxRetryTimeoutMillis (10 * 60000).setRequestConfigCallback(
new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout (60000)
.setSocketTimeout (10 * 60000);
}

}));
CreateIndexRequest crrequest = new CreateIndexRequest(esIndex);
Map<String, Object> jsonMap = new HashMap<>();
Map<String, Object> message = new HashMap<>();
message.put("type", "text");
Map<String, Object> keyword = new HashMap<>();
Map<String, Object> type = new HashMap<>();
type.put("type", "keyword");
type.put("ignore_above", 256);
keyword.put("keyword", type);
message.put("fields", keyword);
Map<String, Object> properties = new HashMap<>();
for (Object hdr :header) {
properties.put(hdr.toString(), message);
}
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
jsonMap.put("_doc", mapping);
crrequest.mapping("_doc", jsonMap);
CreateIndexResponse createIndexResponse = client.indices().create(crrequest, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
System.out.println(acknowledged);
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
logger.info("ElasticSearchServiceImpl => bulkInsert : Some of the record has failed.Please reinitiate the process");
} else {
logger.info("ElasticSearchServiceImpl => bulkInsert : Success");
}
} else {
logger.info("ElasticSearchServiceImpl => bulkInsert : No request for BulkInsert ="+request.numberOfActions());
}

} catch (Exception e) {
logger.info("ElasticSearchServiceImpl => bulkInsert : Exception =" + e.getMessage());
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
logger.info("ElasticSearchServiceImpl => bulkInsert End " + (endTime - starttime));
}
public static MappingIterator<Map<String, Object>> parse(FileInputStream input) throws Exception {


MappingIterator<Map<String, Object>> map = readObjectsFromCsv(input);
return map;
//writeAsJson(data);
}

public static MappingIterator<Map<String, Object>> readObjectsFromCsv(FileInputStream file) throws IOException {
CsvSchema bootstrap = CsvSchema.emptySchema().withHeader().withColumnSeparator(',');
CsvMapper csvMapper = new CsvMapper();
MappingIterator<Map<String, Object>> mappingIterator = csvMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).reader(Map.class).with(bootstrap).readValues(file);
// System.out.println("Column names: " + mappingIterator.next().keySet());
return mappingIterator;
}

public static void writeAsJson(List<Map<?, ?>> data) throws IOException, JSONException {
ObjectMapper mapper = new ObjectMapper();
String value = mapper.writeValueAsString(data);
JSONArray json = new JSONArray(value);
System.out.println(json);
}

public static IndexRequest getIndexRequest(Map data,String index)throws Exception {
IndexRequest indexRequest = null;

indexRequest = new IndexRequest(index).id(UUID.randomUUID().toString()).source(data);
System.out.println(indexRequest.toString());
return indexRequest;
}
}

运行程序时出现以下异常

    {Document Name=dhjajga, Title=sdas, Name=asd, DOB=14-43-22}
index {[post][null][c2148857-87e0-4407-b5f5-b4f5f52c40d2], source[{"Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22"}]}
Jun 11, 2020 4:06:18 PM com.zoho.dedupe.connection.DukeESConnection connect
INFO: Client org.elasticsearch.client.RestHighLevelClient@7c51f34b
true
Jun 11, 2020 4:06:18 PM BulkImport main
INFO: ElasticSearchServiceImpl => bulkInsert : Exception =Validation Failed: 1: type is missing;2: type is missing;3: type is missing;
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;2: type is missing;3: type is missing;
at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:612)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1728)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1694)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:470)
at BulkImport.main(BulkImport.java:85)
Jun 11, 2020 4:06:18 PM BulkImport main
INFO: ElasticSearchServiceImpl => bulkInsert End 1432

当我尝试插入与上面相同的索引请求时,它工作正常。

curl -X POST "localhost:9200/post/_doc/?pretty" -H 'Content-Type: application/json' -d'
{
"Document Name":"dhjajga","Title":"sdas","Name":"asd","DOB":"14-43-22"
}
'

{
"_index" : "post",
"_type" : "_doc",
"_id" : "jBPronIB0Wb3XTTasBjG",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}

请帮忙解决java程序中的问题。提前致谢

最佳答案

在 Elasticsearch 版本 7 之前,您必须使用 Indexrequest 指定类型。建议使用“_doc”类型。

关于java - 使用 JavaAPI 将 CSV 文件批量上传到 Elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62322480/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com