- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我遇到了一个问题,如果我尝试调整 ThreadPoolExecutor
的大小创建池后,将核心池大小设置为不同的数字,然后间歇性地,某些任务被拒绝并返回 RejectedExecutionException
即使我从来没有提交超过 queueSize + maxPoolSize
任务数。
我试图解决的问题是扩展 ThreadPoolExecutor
它根据线程池队列中的挂起执行调整其核心线程的大小。我需要这个,因为默认情况下是 ThreadPoolExecutor
将创建一个新的 Thread
仅当队列已满时。
这是一个小型的自包含 Pure Java 8 程序,用于演示该问题。
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
最佳答案
这是发生这种情况的一个场景:
在我的示例中,我使用 minThreads = 0、maxThreads = 2 和 queueCapacity = 2 使其更短。
第一个命令被提交,这是在方法 execute 中完成的:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
关于Java 线程池执行器 : Updating core pool size dynamically rejects incoming tasks intermittently,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60629936/
该特定代码块的最后一行产生错误“需要左值作为赋值的左操作数”。令人困惑的是为什么最后一行抛出此错误,而倒数第二行却没有。 int p2 = 0; spage = find(in
我想从有金额表的收入表中获取总收入金额 @ColumnInfo(name = "Amount") private int amount; 在我的 IncomeDao 中 @Query("S
我使用emsdk在Ubuntu 14.04 Docker容器上安装了Emscripten,如下所示: emsdk install emscripten-incoming emsdk install c
来自 rust 标准网络库: let listener = TcpListener::bind(("127.0.0.1", port)).unwrap(); info!("Opened socket
我将函数应用配置为期望来自客户端的证书。我今天在 Azure 门户中没有看到该选项。你知道突然发生了什么吗? 最佳答案 顺便说一句,现在该选项已移至“设置”->“配置”->“常规设置”->“需要传入证
我在 docker 容器上运行 fluentd 图像。当我使用 telnet(或 netcat)打开 TCP 连接并发送“消息”字符串时,会出现以下消息: 2017-01-24 10:22:00 +0
是否可以在可穿戴 Android 应用程序中监听传入通知?我尝试实现 NotificationListenerService,但该服务的 onNotificationPosted() 从未被调用: p
我在我的 Slack 工作区中创建了一个传入 Webhook。我正在使用第三方工具将 JSON 对象发布到 Hook url。我想向@user_1 发送通知 我的问题是通知发送到我和该用户@user_
我想在我的 Android 设备上使用 iptables 阻止所有传入连接。我知道 Google 应用程序,如 Play 商店等,需要一些传入连接(例如 GTalkSerivice),它们在阻塞后将不
我正在尝试构建一个基本数据库来帮助筛选许多 CSV 银行交易。 我已经将我所有的交易导入到多个表格中,这些表格来自不同的账户,这些账户的“扣除”列我已经勾选了“假”或“真” 我正在尝试执行“创建 Vi
这个问题在这里已经有了答案: 关闭 11 年前。
我正在使用 arduino,我想储存一些来自网络服务的字符。代码将自行解释: void loop() { static int i = 0; static int count = 0;
我可以在 Scapy 中只嗅探传入或传出数据包吗? 没有在数据包字段上添加过滤器。 最佳答案 简短的回答:没有。 Scapy 的嗅探功能不区分传入和传出数据包。如果你想根据源mac过滤,你可以这样做:
我在 J2ME 工作,想制作一个应用程序来阻止来自某些特定号码的来电。是否可以使用 J2ME 中的编程来阻止某些号码? 最佳答案 不,在 J2ME 中绝对没有办法做到这一点。 关于java - J2M
我有一个应用程序 android,它向服务器 (PC) 发送数据,但我没有收到从 PC 到应用程序的任何数据。而且,我怎样才能为传入的 UDP 消息做一个监听器? 因为我需要一个应用程序即使关闭也能一
我正在测试的单元是一个 IHostedService,它使用来自 Microsoft.Azure.ServiceBus 的 IQueueClient 进行通信。据我所知,没有像 .Receive()
IntelliJ IDEA 提供了一个 Preview Diff用于显示更改的压缩 View 的传出更改。我找不到 的等效项传入更改 .每个单个文件都必须在一个额外的窗口中与以前的修订版或本地更改进行
我正在使用连接在 10G 以太网链路上的专用硬件。我有一些关于处理传入数据报的问题,如下: 如果 NIC 发现不正确的链路级以太网 CRC 会怎样?一些搜索表明错误可能不会被可靠地报告(例如 here
我目前正在编写某种框架,允许其他人为其编写 REST Controller 。当然,我希望那些“其他人”尽可能少地与我的代码中发生的事情进行交互。 具体来说,我想要并且需要访问请求数据(即在请求由其余
我想使用 Google Drive Android API 列出和访问用户的传入项目,但我在 Drive API documentation 中找不到任何方法来执行此操作。 。我只知道如何从屏幕截图中
我是一名优秀的程序员,十分优秀!