- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我的多线程类应该对 ClassA
类的多个对象执行三个操作 - 操作 1
、操作 2
和 操作 3
,其中每种类型的操作都依赖于先前的操作。为此,我尝试使用多个 BlockingQueue 和 ExecutorService 来实现生产者-消费者模式。
final ExecutorService executor = ForkJoinPool.commonPool();
final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
操作的实现如下:
void doOperationOne() throws InterruptedException {
ClassA objectA = operationOneQueue.take();
objectA.operationOne();
operationTwoQueue.put(objectA);
}
其中每种类型的操作都有其自己相应的方法,以及其“自己的”入队列和出队列。每个操作方法都会调用 ClassA 对象上适当的方法。 doOperationThree
方法将 ClassA
对象放入 resultQueue
中,这意味着它们已被完全处理。
首先,我将要操作的所有 ClassA
对象填充到 operationOneQueue
中。然后,我尝试将可执行任务分配给 ExecutorService,如下所示:
while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) {
executor.execute(() -> {
try {
doOperationOne();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationTwo();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationThree();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
运行我的程序时,我收到一个java.util.concurrent.RejectedExecutionException
。
Operation1: ClassA object 0
Operation2: ClassA object 0
Operation1: ClassA object 1
Operation3: ClassA object 0
....
Operation1: ClassA object 46
Operation2: ClassA object 45
Operation3: ClassA object 45
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867)
at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911)
at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
at concurrent.operations.Program1.main(Program1.java:96)
我做错了什么?如何在线程池不过度饱和的情况下实现这一目标?
编辑:全面披露——这是有一些要求的作业。 1. 我必须使用 ForkJoinPool.commonPool()
并且不能自己设置线程数,2. 我必须使用消费者-生产者模式,3. 我不能修改 ClassA
。
最佳答案
我真的很喜欢做并发的事情,所以我尝试编写它。我确实使用了 CompletableFuture,它 a) 默认情况下在 ForkJoinPool.commonPool 中运行,b) 使实际处理变得非常简单:
while (true) {
final ClassA nextOperation = queue.take();
CompletableFuture.runAsync(nextOperation::operationOne)
.thenRun(nextOperation::operationTwo)
.thenRun(nextOperation::operationThree)
.thenRun(() -> resultQueue.add(nextOperation));
}
这将从队列中获取ClassA
对象并同时执行它们的所有操作,但按顺序。
您确实遗漏了任务的来源以及是否需要消费者终止。通常您不想这样做,但这确实会让事情变得更复杂。
private static final int COUNT = 10;
private static final Random RANDOM = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
BlockingQueue<ClassA> runnables = new ArrayBlockingQueue<>(COUNT);
BlockingQueue<ClassA> finished = new ArrayBlockingQueue<>(COUNT);
// start producer
ExecutorService createTaskExecutor = Executors.newSingleThreadExecutor();
createTaskExecutor.submit(() -> fillQueue(runnables));
// wait for all consumer tasks to finish
while (finished.size() != COUNT) {
try {
// we need to poll instead of waiting forever
// because the last tasks might still be running
// while there are no others to add anymore
// so we need to check again if all have finished in the meantime
final ClassA nextOperation = runnables.poll(2, TimeUnit.SECONDS);
if (nextOperation != null) {
CompletableFuture.runAsync(nextOperation::operationOne)
.thenRun(nextOperation::operationTwo)
.thenRun(nextOperation::operationThree)
.thenRun(() -> finished.add(nextOperation));
}
} catch (InterruptedException e) {
System.err.println("exception while retrieving next operation");
// we will actually need to terminate now, or probably never will
throw e;
}
}
System.out.printf("finished tasks (%d):%n", finished.size());
for (ClassA classA : finished) {
System.out.printf("finished task %d%n", classA.designator);
}
createTaskExecutor.shutdown();
}
private static void fillQueue(BlockingQueue<ClassA> runnables) {
// start thread filling the queue at random
for (int i = 0; i < COUNT; i++) {
runnables.add(new ClassA(i));
try {
Thread.sleep(RANDOM.nextInt(1_000));
} catch (InterruptedException e) {
System.err.println("failed to add runnable");
}
}
}
由于您没有提供ClassA
,所以我使用了这个。它包含一个标识符,以便您可以跟踪哪个程序在什么时间运行。
class ClassA {
private static final Random RANDOM = new Random();
public final int designator;
public ClassA(int i) {
designator = i;
}
public void operationOne() {
System.out.printf("%d: operation 1%n", designator);
sleep();
}
public void operationTwo() {
System.out.printf("%d: operation 2%n", designator);
sleep();
}
public void operationThree() {
System.out.printf("%d: operation 3%n", designator);
sleep();
}
private static void sleep() {
try {
Thread.sleep(RANDOM.nextInt(5_000));
} catch (InterruptedException e) {
System.err.println("interrupted while executing task");
}
}
}
关于java - 不断向 ExecutorService 提交 Runnable 任务,直到工作完成,得到 java.util.concurrent.RejectedExecutionException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60268703/
好吧,我的问题听起来很困惑,但实际上很简单。我有一个 Runnable 启动另一个 Runnable 实例。所以基本上: 可运行 1 -> 可运行 2 runnable1 是在 runnable2 还
我正在尝试使用我的单例的 Handler.post() 方法从另一个 runnable 运行一个 runnable,但是直到原始 runnable 完成后才运行第二个 runnable。在下面的示例代
为什么下面的第一个例子不起作用? run(R::new); 方法 R.run 未被调用。 run(new R()); 方法 R.run 被调用。 这两个示例都是可编译的。 public class C
为什么下面的代码不起作用?基本上,这是一个更困难的程序的简化版本,在该程序中,我试图创建一个可运行的初始屏幕,其中包含一些选择,然后有链接到不同可运行项的按钮,但这并没有按照我的预期运行。 impor
我有一个带有主选项卡 Activity 的 Android 应用,以及各个选项卡中的多个 Activity 。在我的主要 Activity 的 onCreate() 中,我有一个创建列表的可运行文件,
我正在使用 Mockito 进行测试。我有一个回调接口(interface): interface Callback { void onMessageRetrieved(String mess
这个问题在这里已经有了答案: Does postDelayed cause the message to jump to the front of the queue? (1 个回答) 关闭 7 年
我想将 runnable 发布到 runnable 内的 View 对象,目前我被困在这里。 var runnable = Runnable { if(numLinesToDraw
假设我有一个 ExecutorService,我用它来运行 Job 对象,这是一个扩展 Runnable 的类。我希望我的 Job 对象并行运行,除了让 2 个 Job 对象具有相同的 job.get
下面是该类的源代码。 我想验证如何 shutdownNow()适用于未提交的任务。我在下面的代码中遇到的问题是 shutdownNow()返回 List而不是 List我已经提交了 List包含提交的
我正在尝试找出如何从多线程应用程序中获得最大性能。 我有一个这样创建的线程池: ExecutorService executor = Executors.newFixedThreadPool(8);
这有什么区别?请引用选项1和选项2。因为我遇到了麻烦,因为它们好像是一样的。它们运行正确 Thread ThreadPoolExecutor executor = (ThreadPoolExecuto
当任何命令在任何 ScheduledExecutorService 上以固定速率调度时,它返回 ScheduledFuture 也可以取消。但是“cancel”并不能保证命令在 cancel 返回后仍
因此,我希望在尝试保持轻资源负载的同时同时完成几件事。例如,同时播放声音和更新 GUI。让多个处理程序具有单个可运行对象或单个处理程序具有多个并行运行的可运行对象更好吗? 我知道下面的实现实际上不会同
这个问题已经有答案了: 已关闭11 年前。 Possible Duplicate: Java: “implements Runnable” vs. “extends Thread” 我只是想知道创建自
为什么 Java 的 scheduleWithFixedDelay 使用 Runnable 而不是包装 runnable 的 FutureTask? 这可以很容易地用两个不同的代码示例来展示: Sch
这个问题在这里已经有了答案: What's the difference between Activity.runOnUiThread(runnable action) and Handler.pos
我有如下三个类: 1.线程实例 public class ThreadInstance extends Thread { public ThreadInstance() { }
春天给了我下面的错误,却想不出为什么!。注意:在XML文件中没有对此的Bean定义。。我也提出了这一点,但没有为我的案件找到任何解决方案
一 Runnable 的职责 Runnable 接口非常简单,只定义了一个无参无返回值的 run 方法,源码如下。 @FunctionalInterface public interface Runn
我是一名优秀的程序员,十分优秀!