gpt4 book ai didi

Java 8 执行器在使用 ExecutorService 读取 `100k` 文件路径时抛出 RejectedExecutionException

转载 作者:行者123 更新时间:2023-12-02 02:01:51 27 4
gpt4 key购买 nike

我正在从 ElasticSearch 索引中获取 100k 文件路径,实际文件在我的本地驱动器中可用。根据文件路径,我必须将这些文件转换为 base64。

我使用 scrollApi 获取 100k 文件路径并添加到 Arraylist 中。收集完所有文件路径后,我想将这些文件转换为base64。

为此,我想为此进程创建一个工作线程(以加快进程)。

请在下面找到我的代码。最初,我从 Elasticsearch 索引中读取文件路径,并将该文件路径传递给我的工作线程以读取路径(未完全实现转换为 Base64)。

public class DocumentIndex {

private final static String INDEX = "documents_local";
private final static String ATTACHMENT = "document_suggestion";
private final static String TYPE = "doc";
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
private static final int BUFFER_SIZE = 3 * 1024;

public static void main(String[] args) throws IOException {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<String> filePathList = new ArrayList<String>();


RestHighLevelClient restHighLevelClient = null;
RestHighLevelClient restHighLevelClient2 = null;
Document doc=new Document();

logger.info("Started Indexing the Document.....");


//Fetching Id, FilePath & FileName from Document Index.
SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.types(TYPE);
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(60L)); //part of Scroll API

searchRequest.scroll(scroll); //part of Scroll API
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder qb = QueryBuilders.matchAllQuery();

searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);

SearchResponse searchResponse = SearchEngineClient.getInstance3().search(searchRequest);
String scrollId = searchResponse.getScrollId(); //part of Scroll API
SearchHit[] searchHits = searchResponse.getHits().getHits();
long totalHits=searchResponse.getHits().totalHits;
logger.info("Total Hits --->"+totalHits);

//part of Scroll API -- Starts
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = SearchEngineClient.getInstance3().searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();

Map<String, Object> jsonMap ;
for (SearchHit hit : searchHits) {

StringBuilder result = null;
String encodedfile = null;
File file=null;

Map<String, Object> sourceAsMap = hit.getSourceAsMap();


if(sourceAsMap != null) {
doc.setId((int) sourceAsMap.get("id"));
doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
doc.setFilename(String.valueOf(sourceAsMap.get("filename")));
doc.setPath(String.valueOf(sourceAsMap.get("path")));
}
if(doc.getPath()!= null && doc.getFilename() != null) {
filePathList.add(doc.getPath().concat(doc.getFilename()));
}
}

logger.info("File Path List size --->"+filePathList.size());


for (int i = 0; i < filePathList.size(); i++) {
Runnable worker = new WorkerThread(filePathList.get(i));
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
}


public class WorkerThread implements Runnable {

private String command;
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());

public WorkerThread(String s){
this.command=s;
}

@Override
public void run() {

logger.info("File Path --->"+command);
System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
System.out.println(Thread.currentThread().getName()+" End.");
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString(){
return this.command;
}

}

从索引读取文件路径时出现以下错误。

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task D:\data\Files\doc753_v1_fr-FR.pdf rejected from java.util.concurrent.ThreadPoolExecutor@35dab4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 10]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.jci.vb2.SearchEngineUtility.DocumentIndex.main(DocumentIndex.java:104)

最佳答案

当您在 ExecutorService 上调用 shutdown 后尝试提交任务时,就会发生这种情况,因为您已经声明/创建了执行程序服务的对象ExecutorService executor = Executors .newFixedThreadPool(5);while 循环之外 while (searchHits != null && searchHits.length > 0) { .在 while 循环的第二次迭代中,它将尝试将任务提交到您已在第一次迭代中调用 shutdown 的执行器服务

要解决此问题,请在 while 循环内声明 ExecutorService executor = Executors.newFixedThreadPool(5); 或在 while 循环结束时关闭

关于Java 8 执行器在使用 ExecutorService 读取 `100k` 文件路径时抛出 RejectedExecutionException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51463567/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com