gpt4 book ai didi

java - 什么正在终止我的 Java ExecutorService

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:33:25 25 4
gpt4 key购买 nike

我最初是在 ThreadPoolExecutor 的一个更复杂的子类中看到这个问题,但我已经简化了,所以现在只包含一些额外的调试,但仍然遇到同样的问题。

import com.jthink.songkong.cmdline.SongKong;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.logging.Level;



public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
/**
* Uses the default CallerRunsPolicy when queue is full
* @param workerSize
* @param threadFactory
* @param queue
*/
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, new CallerRunsPolicy());
}

/**
* Allow caller to specify the RejectedExecutionPolicy
* @param workerSize
* @param threadFactory
* @param queue
* @param reh
*/
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, LinkedBlockingQueue<Runnable> queue, RejectedExecutionHandler reh)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, queue, threadFactory, reh);
}

@Override
public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
return new FutureCallable<T>(callable);
}

/**
* Check not been paused
*
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
SongKong.checkIn();
}

/**
* After execution
*
* @param r
* @param t
*/
@Override
protected void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r, t);

if (t == null && r instanceof Future<?>)
{
try
{
Object result = ((Future<?>) r).get();
}
catch (CancellationException ce)
{
t = ce;
}
catch (ExecutionException ee)
{
t = ee.getCause();
}
catch (InterruptedException ie)
{
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
{
MainWindow.logger.log(Level.SEVERE, "AFTER EXECUTE---" + t.getMessage(), t);
}
}

@Override
protected void terminated()
{
//All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
MainWindow.logger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
MainWindow.userInfoLogger.severe("---Terminated:"+((SongKongThreadFactory)getThreadFactory()).getName());
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
for(StackTraceElement ste:stackTrace)
{
MainWindow.logger.log(Level.SEVERE, ste.toString());
}
for(StackTraceElement ste:stackTrace)
{
MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
}
}

@Override
public void shutdown()
{
MainWindow.logger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
MainWindow.userInfoLogger.severe("---Shutdown:"+((SongKongThreadFactory)getThreadFactory()).getName());
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
for(StackTraceElement ste:stackTrace)
{
MainWindow.logger.log(Level.SEVERE, ste.toString());
}
for(StackTraceElement ste:stackTrace)
{
MainWindow.userInfoLogger.log(Level.SEVERE, ste.toString());
}
super.shutdown();
}
}

此 ExecutorService 正在被以下类使用,允许实例异步提交任务,在所有提交的任务完成之前,ExecutorService 不应关闭。

package com.jthink.songkong.analyse.analyser;

import com.jthink.songkong.preferences.GeneralPreferences;
import com.jthink.songkong.ui.MainWindow;
import com.jthink.songkong.util.SongKongThreadFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

/**
* Sets a timeout of each task submitted and cancel them if take longer than the timeout
*
* The timeout is set to 30 minutes, we only want to call if really broken, it should not happen under usual circumstances
*/
public class MainAnalyserService extends AnalyserService
{
//For monitoring/controlling when finished
private final AtomicInteger pendingItems = new AtomicInteger(0);
private final CountDownLatch latch = new CountDownLatch(1);

//If task has not completed 30 minutes after it started (added to queue) then it should be cancelled
private static final int TIMEOUT_PER_TASK = 30;

private static MainAnalyserService mas;

public static MainAnalyserService getInstanceOf()
{
return mas;
}

public static MainAnalyserService create(String threadGroup)
{
mas = new MainAnalyserService(threadGroup);
return mas;
}

public MainAnalyserService(String threadGroup)
{
super(threadGroup);
initExecutorService();
}

/**
Configure thread to match cpus but even if single cpu ensure have at least two threads to protect against
scenario where there is only cpu and that thread is waiting on i/o rather than being cpu bound this would allow
other thread to do something.
*/
@Override
protected void initExecutorService()
{
int workerSize = GeneralPreferences.getInstance().getWorkers();
if(workerSize==0)
{
workerSize = Runtime.getRuntime().availableProcessors();
}
//Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
{
workerSize = MIN_NUMBER_OF_WORKER_THREADS;
}
MainWindow.userInfoLogger.severe("Workers Configuration:"+ workerSize);
MainWindow.logger.severe("Workers Configuration:"+ workerSize);

executorService = new TimeoutThreadPoolExecutor(workerSize,
new SongKongThreadFactory(threadGroup),
new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
TIMEOUT_PER_TASK,
TimeUnit.MINUTES,
new EnsureIncreaseCountIfRunOnCallingThread());
}

public AtomicInteger getPendingItems()
{
return pendingItems;
}

/**
* If queue is full this gets called and we log that we run task on local calling thread.
*/
class EnsureIncreaseCountIfRunOnCallingThread implements RejectedExecutionHandler
{
/**
* Creates a {@code CallerRunsPolicy}.
*/
public EnsureIncreaseCountIfRunOnCallingThread() { }

/**
* Executes task on calling thread, ensuring we increment count
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown())
{
try
{
MainWindow.userInfoLogger.severe(">>SubmittedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
r.run();
MainWindow.userInfoLogger.severe(">>CompletedLocally:" + ((FutureCallable) r).getCallable().getClass().getName() + ":" + pendingItems.get());
}
catch(Exception ex)
{
MainWindow.userInfoLogger.log(Level.SEVERE, ex.getMessage(), ex);
}
}
}
}

/**
* Increase count and then Submit to ExecutorService
*
* @param callingTask
* @param task
*/
public void submit(Callable<Boolean> callingTask, Callable<Boolean> task) //throws Exception
{
//Ensure we increment before calling submit in case rejectionExecution comes into play
int remainingItems = pendingItems.incrementAndGet();
executorService.submit(task);
MainWindow.userInfoLogger.severe(">>Submitted:" + task.getClass().getName() + ":" + remainingItems);
}

public ExecutorService getExecutorService()
{
return executorService;
}

/**
* Must be called by Callable when it has finished work (or if error)
*
* @param task
*/
public void workDone(Callable task)
{
int remainingItems = pendingItems.decrementAndGet();
MainWindow.userInfoLogger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);
if (remainingItems == 0)
{
MainWindow.userInfoLogger.severe(">Closing Latch:");
latch.countDown();
}
}

/**
* Wait for latch to close, this should occur once all submitted aysync tasks have finished in some way
*
* @throws InterruptedException
*/
public void awaitCompletion() throws InterruptedException{
latch.await();
}
}

调用类有

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

对于一位客户,即使仍有任务未完成,terminated() 方法仍被调用,并且 executorservice 仅运行了 8 分钟,并且没有任务超时。我在本地也看到了这个问题

调试显示

用户日志

05/07/2019 11.29.38:EDT:SEVERE: ----G14922:The Civil War:8907617:American Songs of Revolutionary Times and the Civil War Era:NoScore
05/07/2019 11.29.38:EDT:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.SongSaver:69
05/07/2019 11.29.38:EDT:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:68
05/07/2019 11.29.38:EDT:SEVERE: >MainAnalyser Finished
05/07/2019 11.29.38:EDT:INFO: Stop

调试日志

   05/07/2019 11.29.38:EDT:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker

所以我们可以看到还有 68 个任务要完成,并且 MainAnalyser 还没有关闭闩锁,但是线程池执行器已经终止

我重写了 shutdown() 以查看它是否被调用,它没有被调用,

terminate() 被 runWorker() 调用,runWorker() 应该在循环中继续,直到队列为空,但事实并非如此,但似乎有什么东西导致它在最终进行更多检查后离开循环和 processWorkerExit()终止整个执行器(不仅仅是工作线程)

10/07/2019 07.11.51:BST:MainAnalyserService:submit:SEVERE: >>Submitted:com.jthink.songkong.analyse.analyser.DiscogsSongGroupMatcher:809
10/07/2019 07.11.51:BST:MainAnalyserService:workDone:SEVERE: >>WorkDone:com.jthink.songkong.analyse.analyser.MusicBrainzSongGroupMatcher2:808
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: ---Terminated:Worker
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.getStackTrace(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: com.jthink.songkong.analyse.analyser.TimeoutThreadPoolExecutor.terminated(TimeoutThreadPoolExecutor.java:118)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.tryTerminate(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.processWorkerExit(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
10/07/2019 07.11.51:BST:TimeoutThreadPoolExecutor:terminated:SEVERE: java.base/java.lang.Thread.run(Unknown Source)

因为 ThreadPoolExecutor 是标准 Java 的一部分,我不能(轻易地)设置断点来尝试找出它在做什么,这是 ThreadPoolExecutor 代码(标准 Jave 不是我的代码)

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

我们在 Executor 中试验了队列大小,默认情况下它是 100,因为我不希望它变得太大,因为队列任务将使用更多内存,我宁愿调用任务在队列繁忙时自行运行.但是为了解决这个问题(并消除了因为队列已满而调用 CallerRunPolicy 的需要),我将队列大小增加到 1000,这导致错误发生得更快,然后完全消除了限制并继续更快地失败

 new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),

我正在寻找 ThreadExecutorPool 的替代品并遇到了 ForkJoinPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

我注意到的一件事是,与在外部提交表单相比,ForkJoinPool 在提交给 ForkJoinPool 的任务中使用不同的方法来提交任务。我不知道为什么会这样,但想知道是否因为我正在从 Executor 运行的任务中提交任务,这是否会以某种方式导致问题?

我现在已经设法创建自己的 ThreadPoolExecutor 版本,只需将代码复制/粘贴到新类中,重命名,还必须创建一个需要我的类而不是 ThreadPoolExecutor 的 RejectedExcecutionhandler 版本,并让它运行起来。

开始添加一些调试以查看我是否可以破译正在发生的事情,有什么想法吗?

在调用 processWorkerExit 之前我添加了

 MainWindow.userInfoLogger.severe("-----------------------"+getTaskCount()
+":"+getActiveCount()
+":"+w.completedTasks
+":"+ completedAbruptly);

失败了

-----------------------3686:0:593:false

最佳答案

很长一段时间我认为问题一定出在我的代码上,然后我开始认为问题出在 ThreadPoolExecutor 上。 , 但将调试添加到我自己的 runWorker() 版本中显示问题确实是我自己的代码。

 final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
MainWindow.userInfoLogger.severe("-----------------------"+workQueue.size());

从这里我可以看出,虽然工作队列通常变得更长并且与

的值相匹配
MainThreadAnalyzer.pendingItems -noOfWorkerThreads

在一个特定的点上,这两个值出现了分歧,这就是 SongLoader 进程(我没有真正考虑到这一点)完成的时候。因此 MainThreadAnalyzer 继续提交工作,增加了 pendingItems 的值,但 Executor 的工作队列大小越来越小。

这导致我们意识到 Executor 很早就有 shutdown(),但我们没有意识到这一点,因为只有在 songloader 关闭后才检查 latch。

它关闭的原因是因为在 MainAnalyzerThread 的早期比SongLoader更快地完成工作正在提交它,因此 pendingItems 的值暂时设置为零,允许关闭闩锁。

解决方法如下

添加一个 boolean 标志以指示 songLoader 何时完成,并且仅在设置此标志后才允许关闭闩锁。

private boolean songLoaderCompleted = false;
public void workDone(Callable task)
{
int remainingItems = pendingItems.decrementAndGet();
MainWindow.logger.severe(">>WorkDone:" + task.getClass().getName() + ":" +remainingItems);

if (remainingItems == 0 && songLoaderCompleted)
{
MainWindow.logger.severe(">Closing Latch:");
latch.countDown();
}
}

然后在 SongLoader 完成后在主线程中设置此标志

 //Start SongLoader
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all folder
//submissions completed to the MainAnalyserService
songLoaderService.shutdown();
songLoaderService.awaitTermination(10, TimeUnit.DAYS);
MainWindow.userInfoLogger.severe(">Song Loader Finished");

//Were now allowed to consider closing the latch because we know all songs have now been loaded
//so no false chance of zeroes
analyserService.setSongLoaderCompleted();

//Just waits for all the async tasks on the list to complete/fail
analyserService.awaitCompletion();
MainWindow.userInfoLogger.severe(">MainAnalyser Completed");

//This should be immediate as there should be no tasks still remaining
analyserService.getExecutorService().shutdown();
analyserService.getExecutorService().awaitTermination(10, TimeUnit.DAYS);

关于java - 什么正在终止我的 Java ExecutorService,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56937085/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com