- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
ExtThreadPoolExecutor作用是对线程池的增强,如在初始化线程池时、在线程执行前、执行后等处可添加自定义逻辑。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExtThreadPoolExecutor extends ThreadPoolExecutor{
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
init();
}
public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
init();
}
private void init(){
System.out.println("ExtThreadPoolExecutor init......");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute...... begin" );
super.beforeExecute(t, r);
System.out.println("beforeExecute...... end" );
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute...... begin" );
super.afterExecute(r, t);
System.out.println("afterExecute...... end" );
}
}
通过BlockingQueue存放任务线程,该处使用生产者、消费者模式。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class WorkQueue {
private volatile static BlockingQueue<WorkEvent> queue;
private WorkQueue(){}
/**
* 初始化队列,延迟初始化,其实也可使用内部类单例模式
*/
private static void init(){
if(queue == null){
System.out.println("WorkQueue.queue null init........");
synchronized (WorkQueue.class) {
System.out.println("WorkQueue.queue after synchronized still null init........");
if (queue == null) {
queue = new LinkedBlockingDeque<WorkEvent>();
}
}
}
}
public static void putWorkEvent(WorkEvent workEvent){
init();
try {
queue.put(workEvent);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("WorkQueue.putWorkEvent fail........");
}
}
public static BlockingQueue<WorkEvent> getQueue() {
return queue;
}
}
业务处理
public class EventHandler {
/**
* 处理业务
* @param workEvent
*/
public static void handle(WorkEvent workEvent){
System.out.println("正在处理,workNo=[" + workEvent.getWorkNo() + "]");
}
}
工作线程
消费者端,阻塞接收消息,并将消息传给实际需要者。
public class WorkThread implements Runnable{
@Override
public void run() {
while (true) {
try {
WorkEvent workEvent = WorkQueue.getQueue().take();
System.out.println("ThreadName[" + Thread.currentThread().getName() + "], 获取到workEvent,workNo=[" + workEvent.getWorkNo() + "], ready handle");
EventHandler.handle(workEvent);
System.out.println("ThreadName[" + Thread.currentThread().getName() + "], 获取到workEvent,workNo=[" + workEvent.getWorkNo() + "], finish handle");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消息实体
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
public class WorkEvent implements Serializable{
private static final long serialVersionUID = -1739230985770176506L;
/**
* 任务编号
*/
private String workNo;
/**
* 执行次数
*/
private AtomicInteger num;
public WorkEvent(String workNo) {
this.workNo = workNo;
this.num = new AtomicInteger(0);
}
public String getWorkNo() {
return workNo;
}
public void setWorkNo(String workNo) {
this.workNo = workNo;
}
public AtomicInteger getNum() {
return num;
}
public void setNum() {
this.num.incrementAndGet();
}
}
调用示例:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class StartWork {
public static void main(String[] args) {
System.out.println("准备放任务线程");
int workNum = 6;
for (int i = 0; i < workNum; i++) {
WorkEvent workEvent = new WorkEvent("任务线程" + i);
WorkQueue.putWorkEvent(workEvent);
}
// 初始化线程池
ExtThreadPoolExecutor executor = new ExtThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
// 先准备工作线程
System.out.println("准备五个工作线程");
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
executor.execute(new WorkThread());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10s后 。。。 准备放任务线程");
for (int i = 0; i < workNum; i++) {
WorkEvent workEvent = new WorkEvent("10s 后 任务线程" + i);
WorkQueue.putWorkEvent(workEvent);
}
}
}
结果示例
代码大体流程:消息定义成实体WorkEvent,放入WorkQueue中,然后由ExtThreadPoolExecutor线程池开启接收端线程WorkThread,由WorkThread获取消息,并通知实际需要者EventHandler,EventHandler处理消息。
这是正确的吗? ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
我只是对我编写的一些代码感到非常困惑。我惊讶地发现: with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
我正在尝试同时使用 InheritableThreadLocal 和 ThreadPoolExecutor。 这是因为 ThreadPoolExecutor 为每个池重用线程(毕竟它是一个池),这意味
concurrent.futures.ThreadPoolExecutor 通过 传递 function 到执行器 executor.submit(my_function) 像这样: def my_f
将一个简单的测试程序放在一起,应该并行执行一些任务。每次我们提交6个任务,等待完成。然后,又提交了一组任务。 import java.util.concurrent.*; public class
我正在运行一段 python 代码,其中多个线程通过线程池执行程序运行。每个线程都应该执行一项任务(例如获取网页)。我想要做的是终止所有线程,即使其中一个线程失败。例如: with ThreadPoo
我有一个使用阻塞队列的ThreadPoolExecutor,并且正在尝试调试一个问题,在该问题中,我怀疑任务在ThreadPoolExecutor的队列中停留的时间太长,无法执行。我正在尝试验证这一理
我正在尝试使用 futures backport 包在 Python 中使用 ThreadPoolExecutor。然而,问题是所有线程都是同时执行的,所以没有实际的池化发生。更具体地说,我得到了该函
我有两个 list : a = [1, 2, 3, 4] b = [9, 8, 7, 6] 我希望将这两个列表的每个组合作为参数传递给我正在执行多线程处理的函数: def test(hello, wo
当我们谈论ThreadPoolExecutor时,核心池大小和最大池大小之间到底有什么区别? 可以用例子来解释吗? 最佳答案 来自this blog post : Take this example.
我对“concurrent.futures”的并行处理相当陌生,我正在测试一些简单的实验。我编写的代码似乎可以工作,但我不确定如何存储结果。我尝试创建一个列表(“ future ”)并将结果附加到该列
我审查了多线程,并尝试实现一个创建单独线程来运行收集进程的应用程序。该过程中的主要方法需要一个变量数组列表,我正在尝试找出一种将数组列表传递给每个线程的方法。 ApplicationContext c
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
我需要读取一个大的 csv 文件(328 MB)并对其进行处理。每行的处理还包括调用 Web 服务。 我是第一次使用ThreadPoolExecutor。我的逻辑是,我将从 csv 中每 100 行吐
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED def div_zero(x): print('I
我有一个带有 run 方法的类,该类的 main 方法中的计时器正在使用以下代码调用该类: Timer timer = new Timer(); timer.scheduleAtFixedRate(n
尝试调试竞争条件,其中我们的应用程序的轮询器线程之一永远不会返回,导致 future 的轮询器永远不会被调度。用抽象术语来说,在捕获问题时隐藏我们的业务逻辑,这就是我们的代码路径。 我们必须更新远程服
下面的错误是什么意思?我怎样才能恢复它? Exception in thread "UserActionProcessor-8811" java.util.concurrent.RejectedExe
我正在致力于增强现有的 Java 应用程序。该应用程序是一个消息处理器,每天处理数百万条消息。它基本上是使用 Core Java 编写的,线程和队列是使用 Collection 类实现的。 在此应用程
我想更新使用用户定义线程池的旧代码。我想使用 java ThreadPoolExecutor,但问题是发送到线程池的请求不是可运行的。有什么方法可以将 ThreadPoolExecutor 与现有请求
我是一名优秀的程序员,十分优秀!