gpt4 book ai didi

elasticsearch - ElasticSearch Bulkload TransportException:TransportService已关闭停止,无法发送请求

转载 作者:行者123 更新时间:2023-12-02 23:32:02 26 4
gpt4 key购买 nike

我们将Elasticsearch集群从2.1.1升级到2.2。我们通过2.2.1之前版本运行的bulkload处理器进行的批量加载过程现在引发以下异常。如果我想念什么,请告诉我。我是这个新手。

[main] INFO org.elasticsearch.plugins - [Masque] modules [], plugins [], sites []
[main] INFO com.zu.bids.rt.ESTest - Going to execute new bulk composed of 100 actions
[elasticsearch[Masque][listener][T#1]] WARN com.zu.bids.rt.ESTest - Error executing bulk
SendRequestTransportException[[Tutinax the Mountain-Mover][elasticsearch-master-01/10.240.0.22:9300][indices:data/write/bulk]]; nested: TransportException[TransportService is closed stopped can't send request];
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:323)
at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:51)
at org.elasticsearch.client.transport.support.TransportProxyClient$1.doWithNode(TransportProxyClient.java:58)
at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:247)
at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:46)
at org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:588)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: TransportException[TransportService is closed stopped can't send request]
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:303)
... 8 more

测试代码如下:
public class ESTest {

private static final Logger LOG = LoggerFactory.getLogger(ESTest.class);

public static void main(String[] args) throws IOException {

String[] esnodes = "elasticsearch-master-01:9300,elasticsearch-master-02:9300,elasticsearch-master-03:9300"
.split(",");
Settings settings = Settings.builder().put("cluster.name", "zuelasticsearch")
.put("client.transport.sniff", false).put("client.transport.ping_timeout", 20, TimeUnit.SECONDS)
.build();

TransportClient tclient = TransportClient.builder().settings(settings).build();
for (String node : esnodes) {
String[] host = node.split(":");
tclient.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(host[0]), Integer.valueOf(host[1])));
}

Client client = tclient;

BulkProcessor bulkProcessor = getBulkProcessor(client, 3000, 3);

String[] strs = { "This", "is", "a", "good", "test" };

for (int i = 0; i < 100; i++) {

XContentBuilder jsonBuilder = XContentFactory.jsonBuilder().startObject();
jsonBuilder.field(String.valueOf(i), strs[i % 5]);
jsonBuilder.endObject();
bulkProcessor
.add(client.prepareIndex("remtest", "tello", String.valueOf(i)).setSource(jsonBuilder).request());
}

bulkProcessor.close();
client.close();

}

public static BulkProcessor getBulkProcessor(Client client, int nActions, int nConcurrentRequests) {
return BulkProcessor.builder(client, new Listener() {
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
}

public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
LOG.info("Executed bulk composed of {} actions", request.numberOfActions());
}

public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
LOG.warn("Error executing bulk", failure);
}

}).setBulkActions(nActions).setConcurrentRequests(nConcurrentRequests).build();
}

}

最佳答案

我通过在代码中添加以下行来解决此问题

bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

之前
bulkProcessor.close();
client.close();

关于elasticsearch - ElasticSearch Bulkload TransportException:TransportService已关闭停止,无法发送请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35345506/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com