gpt4 book ai didi

java - 生成多个线程,从单个集合中获取输入并将结果放入单个集合中

转载 作者:行者123 更新时间:2023-12-01 23:15:54 30 4
gpt4 key购买 nike

这是我想做的事情的简介,我有一个场景

  1. 每天动态生成大量文本文件。 0到每天 8 次。每个文件的大小可以从小到大。根据日的数据。
  2. 需要对其进行一些检查(业务检查)。

我计划在最短的时间内完成任务,因此尝试编写一个并行执行器来对这些文件执行检查。

我的想法是

  1. 将 n 个文件存储在并发集合 (ConcurrentLinkedQueue) 中
  2. 删除文件,生成一个线程,对文件运行所有检查
  3. 由于 1 个文件与另一个文件没有关系,我希望能够处理多个文件
  4. 将结果存储在另一个并发集合中(ConcurrentLinkedQueue ...它会转换为不同的 html pdf 报告)
  5. 注意:线程数可以与文件数不同(我希望线程数可配置,而不是文件数 = 线程数的情况)

我的理解是这样我应该能够在最短的时间内完成每日检查。

我的代码如下所示,什么让我困惑“如何在每个线程完成后将所有线程的结果存储在单个集合中”,我的直觉是我正在做一些有趣的(不正确的)事情我正在存储结果。

第二个问题想要检查是否有人预见到下面的代码片段中的任何其他问题

第三个问题这似乎是一个常见的用例(对我来说)任何指向解决此问题的设计模式代码片段的指针

注意:我使用的是 JDK 6。

public class CheckExecutor {
// to store all results of all threads here , then this will be converted to html/pdf files
static ConcurrentLinkedQueue<Result> fileWiseResult = new ConcurrentLinkedQueue<Result>();

public static void main(String[] args) {
int numberOfThreads=n; // need keep it configurable
Collection<ABCCheck> checksToExecute // will populate from business logic , ABCCheck is interface , has a method check() , there are different implementations

ConcurrentLinkedQueue<File> fileQueue = new ConcurrentLinkedQueue<File>(); // list of files for 1 day , may vary from 0 to 8
int maxNumOfFiles = fileQueue.size();

ThreadGroup tg = new ThreadGroup ("Group");
// If more number of threads than files (rare , can be considered corener case)
if (maxNumOfFiles < numberOfThreads) numberOfThreads=maxNumOfFiles;
// loop and start number of threads
for(int var=0;var<numberOfThreads;var++)
{
File currentFile = fileQueue.remove();
// execute all checks on 1 file using checksToExecute
ExecuteAllChecks checksToRun = new ExecuteAllChecks(); // business logic to populate checks
checksToRun.setchecksToExecute(checksToExecute);
checksToRun.setcheckResult(fileWiseResult); // when each check finishes want to store result here
new Thread (tg , checksToRun , "Threads for "+currentFile.getName()).start();
}

// To complete the tasak ... asap ... want to start a new thread as soon as any of current thread ends (diff files diff sizes)
while(!fileQueue.isEmpty()) {
try {
Thread.sleep(10000); // Not sure If this will cause main thread to sleep (i think it will pause current thread ) i want to pause main thread
} catch (InterruptedException e) {
e.printStackTrace();
}
// check processing of how many files completed
if( (tg.activeCount()<numberOfThreads) && (fileQueue.size()>0) ) {
int numOfThreadsToStart = numberOfThreads - tg.activeCount();
for(int var1=0;var1<numOfThreadsToStart;var1++) {
File currentFile = fileQueue.remove();
ExecuteAllchecks checksToRun = new ExecuteAllchecks();
checksToRun.setchecksToExecute(checksToExecute);
checksToRun.setcheckResult(fileWiseResult); // when each check finishes want to store result here
new Thread (tg , checksToRun , "Threads for "+currentFile.getName()).start();
}
}
}
}
}

class ExecuteAllchecks implements Runnable {

private Collection<ABCCheck> checksToExecute;
private ConcurrentLinkedQueue<Result> checkResult; // not sure if its correct , i want to store result off all threads here

public ConcurrentLinkedQueue<Result> getcheckResult() {
return checkResult;
}

// plan to instantiate the result collection globally and store result here
public void setcheckResult(ConcurrentLinkedQueue<Result> checkResult) {
this.checkResult = checkResult;
}

public Collection<ABCCheck> getchecksToExecute() {
return checksToExecute;
}

public void setchecksToExecute(Collection<ABCCheck> checksToExecute) {
this.checksToExecute = checksToExecute;
}



@Override
public void run() {
Result currentFileResult = new Result();
// TODO Auto-generated method stub
System.out.println("Execute All checks for 1 file");
// each check runs and calls setters on currentFileResult
checkResult.add(currentFileResult);
}

}

最佳答案

实际的实现很大程度上受到计算本身性质的影响,但通用的方法可能是:

private final ExecutorService executor = Executors.newCachedThreadPool();
private final int taskCount = ...;
private void process() {
Collection< Callable< Result > > tasks = new ArrayList<>( taskCount );
for( int i = 0; i < taskCount; i++ ) {
tasks.add( new Callable< Result >() {

@Override
public Result call() throws Exception {
// TODO implement your logic and return result
...
return result;
}

} );
}
List< Future< Result > > futures = executor.invokeAll( tasks );
List< Result > results = new ArrayList<>( taskCount );
for( Future< Result > future : futures ) {
results.add( future.get() );
}
}

我还建议在 future.get() 上使用合理的超时调用以便执行线程不被卡住。

不过,我也不建议在生产中使用缓存线程池,因为只要当前池没有足够的容量来完成所有任务,该池就会增加,而是使用类似 Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ) 的东西。

您的实际任务可以分为几个小任务,然后考虑检查如何使用 ForkJoin framework 有效地完成这些任务。

关于java - 生成多个线程,从单个集合中获取输入并将结果放入单个集合中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21220801/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com