gpt4 book ai didi

java - CloseableHttpAsyncClient 以 ConnectionClosedException : Connection closed unexpectedly 终止

转载 作者:行者123 更新时间:2023-11-30 10:50:03 25 4
gpt4 key购买 nike

我正在开发一个文件下载器,它提交对大约一千个文件的获取请求。我遇到了this article这将有助于使用执行程序框架提交大量请求。我尝试运行较少数量的文件(大约一百个),它正在运行。但是,我运行的大量文件导致了 ConnectionClosedException。

这是提交请求的下载代码:

void download(String sObjname, List<FileMetadata> blobList) throws IOException, InterruptedException
{
long totalSize = 0;
this.sObjname = sObjname;
for (FileMetadata doc : blobList)
{
totalSize += doc.getSize();
doc.setStatus(JobStatus.INIT_COMPLETE);
}
totalFileSize = new AtomicLong(totalSize);

// Async client definiton; MAX_CONN around 5-15
try (CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setMaxConnPerRoute(MAX_CONN)
.setMaxConnTotal(MAX_CONN).build())
{
httpclient.start();

// Define the callback for handling the response and marking the status
FutureCallback<String> futureCallback = new FutureCallback<String>() {

@Override
public void cancelled()
{
logger.error("Task cancelled in the rest client.");
shutdownLatch.countDown();
}

@Override
public void completed(String docPath)
{
FileMetadata doc = futureMap.get(docPath);
logger.info(doc.getPath() + " download completed");
totalFileSize.addAndGet(-1 * doc.getSize());
doc.setStatus(JobStatus.WRITE_COMPLETE);
shutdownLatch.countDown();
}

@Override
public void failed(Exception e)
{
shutdownLatch.countDown();
logger.error("Exception caught under failed for " + sObjname + " " + e.getMessage(), e);
Throwable cause = e.getCause();
if (cause != null && cause.getClass().equals(ClientProtocolException.class))
{
String message = cause.getMessage();
// TODO Remove this
logger.error("Cause message: " + message);
String filePath = message.split("Unable to download the file ")[1].split(" ")[0];
futureMap.get(filePath).setStatus(JobStatus.WRITE_FAILED);
}
}

};


// Submit the get requests here
String folderPath = SalesforceUtility.getFolderPath(sObjname);
new File(new StringBuilder(folderPath).append(File.separator).append(Constants.FILES).toString()).mkdir();
String body = (sObjname.equals(Constants.contentVersion)) ? "/VersionData" : "/body";
shutdownLatch = new CountDownLatch(blobList.size());
for (FileMetadata doc : blobList)
{
String uri = baseUri + "/sobjects/" + sObjname + "/" + doc.getId() + body;
HttpGet httpGet = new HttpGet(uri);
httpGet.addHeader(oauthHeader);
doc.setStatus(JobStatus.WRITING);

// Producer definition
HttpAsyncRequestProducer producer = HttpAsyncMethods.create(httpGet);

// Consumer definition
File docFile = new File(doc.getPath());
HttpAsyncResponseConsumer<String> consumer = new ZeroCopyConsumer<String>(docFile) {
@Override
protected String process(final HttpResponse response, final File file,
final ContentType contentType) throws Exception
{
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK)
{
throw new ClientProtocolException("Unable to download the file " + file.getAbsolutePath()
+ ". Error code: " + response.getStatusLine().getStatusCode() + "; Error message: "
+ response.getStatusLine());
}
return file.getAbsolutePath();
}

};

// Execute the request
logger.info("Submitted download for " + doc.getPath());
httpclient.execute(producer, consumer, futureCallback);
futureMap.put(doc.getPath(), doc);
}

if (futureMap.size() > 0)
schedExec.scheduleAtFixedRate(timerRunnable, 0, 5, TimeUnit.MINUTES);

logger.debug("Waiting for download results for " + sObjname);
shutdownLatch.await();

}
finally
{
schedExec.shutdown();
schedExec.awaitTermination(24, TimeUnit.HOURS);
logger.debug("Finished downloading files for " + sObjname);
}
}

我收到的堆栈跟踪是:

org.apache.http.ConnectionClosedException: Connection closed unexpectedly
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:139) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) [httpasyncclient-4.1.1.jar:4.1.1]
at org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) [httpasyncclient-4.1.1.jar:4.1.1]
at org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:102) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:281) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:442) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:285) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:106) [httpcore-nio-4.4.4.jar:4.4.4]
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:590) [httpcore-nio-4.4.4.jar:4.4.4]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_72]

对于一些 worker 。

最佳答案

感谢@lucasvc,解释了默认行为here .针对我的解决方案,将代码更新为如下,并没有出现问题。

IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setConnectTimeout(TIMEOUT_5_MINS_IN‌​_MILLIS)
.setSoTimeout(TIMEOUT_5_MINS_IN_MILLIS).build();

try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom()
.setDefaultIOReactorConfig(reactorC‌​onfig)
.setDefaultHeaders(Collections.singletonList(oauthHeader‌​))
.setMaxConnPerRout‌​e(MAX_CONN)
.setMaxConnTotal(MAX_CONN).build();) {
// ...
}

关于java - CloseableHttpAsyncClient 以 ConnectionClosedException : Connection closed unexpectedly 终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35207089/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com