gpt4 book ai didi

java - 使用 Rest High Level Client 检索数据或将数据插入 Elastic Search 时出现 SocketTimeoutException

转载 作者:行者123 更新时间:2023-11-29 02:56:35 24 4
gpt4 key购买 nike

我在从弹性设备检索数据/向弹性设备插入数据时遇到SocketTimeoutException。当大约 10-30 个请求/秒 时,就会发生这种情况。这些请求是 get/put 的组合。

这是我的弹性配置:

  • 3 个主节点,每个节点 4G​​B RAM
  • 2 个数据节点,每个8GM RAM
  • 连接到上述数据节点的Azure负载均衡器(似乎只打开了9200端口)。 Java 客户端连接到此负载均衡器,因为它只是公开的。
  • 弹性版本:7.2.0
  • 高级客户休息:

    <dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.2.0</version>
    </dependency>

    <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>7.2.0</version>
    </dependency>

索引信息:

  • 索引分片:2
  • 索引副本:1
  • 索引总字段:10000
  • kibana 索引大小:总计 -27.2 MB & 主索引:12.2MB
  • 索引结构:
    {
    "dev-index": {
    "mappings": {
    "properties": {
    "dataObj": {
    "type": "object",
    "enabled": false
    },
    "generatedID": {
    "type": "keyword"
    },
    "transNames": { //it's array of string
    "type": "keyword"
    }
    }
    }
    }
    }
  • 动态映射已停用。

以下是我的弹性配置文件。这里我有两个连接 bean,一个用于读取,另一个用于写入弹性。

ElasticConfig.java:

@Configuration
public class ElasticConfig {

@Value("${elastic.host}")
private String elasticHost;

@Value("${elastic.port}")
private int elasticPort;

@Value("${elastic.user}")
private String elasticUser;

@Value("${elastic.pass}")
private String elasticPass;

@Value("${elastic-timeout:20}")
private int timeout;

@Bean(destroyMethod = "close")
@Qualifier("readClient")
public RestHighLevelClient readClient(){

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));

RestClientBuilder builder = RestClient
.builder(new HttpHost(elasticHost, elasticPort))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
);

builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(10000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(0)
);

RestHighLevelClient restClient = new RestHighLevelClient(builder);
return restClient;
}

@Bean(destroyMethod = "close")
@Qualifier("writeClient")
public RestHighLevelClient writeClient(){

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));

RestClientBuilder builder = RestClient
.builder(new HttpHost(elasticHost, elasticPort))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
);

builder.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(10000)
.setSocketTimeout(60000)
.setConnectionRequestTimeout(0)
);

RestHighLevelClient restClient = new RestHighLevelClient(builder);
return restClient;
}

}

这里是调用elastic的函数,如果elastic中有可用数据,它将获取它,否则它将生成数据并放入elastic中。

public Object getData(Request request) {

DataObj elasticResult = elasticService.getData(request);
if(elasticResult!=null){
return elasticResult;
}
else{
//code to generate data
DataObj generatedData = getData();//some function which will generated data
//put above data into elastic by Async call.
elasticAsync.putData(generatedData);
return generatedData;
}
}

ElasticService.java getData函数:

@Service
public class ElasticService {

@Value("${elastic.index}")
private String elasticIndex;

@Autowired
@Qualifier("readClient")
private RestHighLevelClient readClient;

public DataObj getData(Request request){
String generatedId = request.getGeneratedID();

GetRequest getRequest = new GetRequest()
.index(elasticIndex) //elastic index name
.id(generatedId); //retrieving by index id from elastic _id field (as key-value)

DataObj result = null;
try {
GetResponse response = readClient.get(getRequest, RequestOptions.DEFAULT);
if(response.isExists()) {
ObjectMapper objectMapper = new ObjectMapper();
result = objectMapper.readValue(response.getSourceAsString(), DataObj.class);
}
} catch (Exception e) {
LOGGER.error("Exception occurred during fetch from elastic !!!! " + ,e);
}
return result;
}

}

ElasticAsync.java异步放入数据函数:

@Service
public class ElasticAsync {

private static final Logger LOGGER = Logger.getLogger(ElasticAsync.class.getName());

@Value("${elastic.index}")
private String elasticIndex;

@Autowired
@Qualifier("writeClient")
private RestHighLevelClient writeClient;

@Async
public void putData(DataObj generatedData){
ElasticVO updatedRequest = toElasticVO(generatedData);//ElasticVO matches to the structure of index given above.

try {
ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(updatedRequest);

IndexRequest request = new IndexRequest(elasticIndex);
request.id(generatedData.getGeneratedID());
request.source(jsonString, XContentType.JSON);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
request.timeout(TimeValue.timeValueSeconds(5));
IndexResponse indexResponse = writeClient.index(request, RequestOptions.DEFAULT);
LOGGER.info("response id: " + indexResponse.getId());

}

} catch (Exception e) {
LOGGER.error("Exception occurred during saving into elastic !!!!",e);
}


}

}

以下是在将数据保存到弹性过程中发生异常时堆栈跟踪的某些部分:

2019-07-19 07:32:19.997 ERROR [data-retrieval,341e6ecc5b10f3be,1eeb0722983062b2,true] 1 --- [askExecutor-894] a.c.s.a.service.impl.ElasticAsync        : Exception occurred during saving into elastic !!!!

java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]


Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
... 1 common frames omitted

以下是在将数据检索到弹性过程中发生异常时堆栈跟踪的某些部分:

2019-07-19 07:22:37.844 ERROR [data-retrieval,104cf6b2ab5b3349,b302d3d3cd7ebc84,true] 1 --- [o-8080-exec-346] a.c.s.a.service.impl.ElasticService      : Exception occurred during  fetch from elastic !!!! 

java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1433) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1403) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1373) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
at org.elasticsearch.client.RestHighLevelClient.get(RestHighLevelClient.java:699) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]



Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
... 1 common frames omitted

我浏览了几个 stackoverflowelastic 相关博客,他们提到此问题可能是由于 RAM弹性的集群配置。然后我将分片从 5 个更改为 2 个,因为只有两个数据节点。还将数据节点的 RAM 从 4GB 增加到 8GB,因为我了解到 Elastic 将仅使用总 RAM 的 50%。异常的发生次数减少了,但问题仍然存在。

有哪些可能的方法可以解决这个问题?从 java/elastic 配置的角度来看,我缺少什么,经常抛出这种 SocketTimeoutException ?如果您需要有关配置的更多详细信息,请告诉我。

最佳答案

我们遇到了同样的问题,经过相当多的挖掘,我发现了根本原因:客户端和弹性服务器内核配置之间的防火墙配置不匹配,以保持 TCP 的 Activity 状态。

防火墙在 3600 秒后丢弃空闲连接。问题在于 tcp keepalive 的内核参数设置为 7200 秒(RedHat 6.x/7.x 中的默认值):

sysctl -n net.ipv4.tcp_keepalive_time
7200

因此,在发送保持 Activity 探测之前,连接会被丢弃。弹性http客户端中的asyncHttpClient似乎不能很好地处理丢失的连接,它只是等待套接字超时。

因此,请检查客户端和服务器之间是否有任何网络设备(负载均衡器、防火墙、代理等)存在 session 超时或类似情况,并增加该超时或降低 tcp_keep_alive 内核参数。

关于java - 使用 Rest High Level Client 检索数据或将数据插入 Elastic Search 时出现 SocketTimeoutException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57109948/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com