gpt4 book ai didi

Java - ExecutorService 有最大尺寸

转载 作者:行者123 更新时间:2023-11-29 07:29:34 26 4
gpt4 key购买 nike

有什么方法可以通过一个巨大的数据库并并行应用一些作业来处理条目?我尝试使用 ExecutorService,但我们必须关闭()才能知道池大小...

所以我最好的解决方案是:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest)
{
return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
int dbOffset = 0;
int nbOfArticlesPerRequest = 100;
int MYTHREADS = 10;
int loopIndex = 0;
boolean bContinue=true;
Runnable worker;



while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}

executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
}



public static class MyRunnable implements Runnable {

private final String id;

MyRunnable(String id) {
this.id = id;
}

@Override
public void run()
{
System.out.println("Thread '"+id+"' started");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread '"+id+"' stopped");
}
}
}

这工作正常,但缺点是在循环的每个结束时我都需要等待最后一个线程完成。

例如:当只有 3 个线程在运行时......

为了解决这个问题,我做了以下操作,但这样“安全”/正确吗?

顺便说一句:有没有办法知道队列中有多少任务/线程?

    int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;

ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}

while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}

executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

编辑:

我尝试启动 1 或 1000 万个任务,所以(我假设)我不能将它们全部放入队列中......这就是为什么我使用全局执行器以便能够始终有一些线程队列(为此我无法关闭执行程序,否则它不再可用)。

最新代码版本:

    int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;

ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executorPool.execute(worker);
}

while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2))
{
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);

if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
break;
}

TimeUnit.MILLISECONDS.sleep(2000);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}

executorPool.shutdown();
// Wait until all threads are finish
while (!executorPool.isTerminated()) {
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

提前致谢

最佳答案

更新

现在我很清楚你主要担心的是你不能一次提交 1000 万个任务。

不用怕,全部提交到executor即可。并行运行的实际任务量受底层线程池大小的限制。也就是说,如果您的池大小为 2,则此时只有两个任务正在执行,其余任务坐在队列中等待空闲线程。

默认情况下,Executors.newFixedThreadPool() 创建具有 Integer.MAX_VALUE 大小队列的执行器,因此您的数百万个任务将适合那里。


您可以使用返回FutureExecutorService.submit() 方法。然后您可以检查 Future 任务的状态(即使用 isDone()isCancelled() 方法)。

Executor 通常是您不想明确关闭的东西,它存在于您的整个应用程序生命周期中。使用这种方法,您无需关机即可了解有多少任务待处理。

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
Future<?> task = executorService.submit(() -> {
// do work
});
tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

此外,请注意任务和线程不是可互换的术语。在您的情况下,执行程序具有固定数量的线程。您可以提交比这更多的任务,但其余任务将位于执行程序的队列中,直到有一个空闲线程来运行该任务。

关于Java - ExecutorService 有最大尺寸,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44827159/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com