gpt4 book ai didi

java - 如何正确使用Condition来等待和notifyAll而不丢失通知?

转载 作者:行者123 更新时间:2023-12-02 11:04:29 25 4
gpt4 key购买 nike

我有一组任务,它们分为两部分enquedeque以确保合格(多个)运行首先和然后其他按顺序限定(类似于优先级队列)。

enque中,将检查任务,并且仅执行合格任务,而其他不合格任务将被阻止。

deque内,完成的任务将从queue中删除,然后notifyAll被阻塞(实际上是所有其他线程选择<强>合格)。

这是我想要实现的目标的简化演示:

class MyTaskQueue {
private static final Object THE_QUEUE_LOCK = new Object();
public static Map<String, ReentrantLock> taskGroupLock = new HashMap<>();
public static Map<String, Condition> taskGroupCondition = new HashMap<>();

public static void enque(String name, String taskId) {
synchronized (THE_QUEUE_LOCK) {
taskGroupLock.putIfAbsent(name, new ReentrantLock());
taskGroupCondition.putIfAbsent(name, taskGroupLock.get(name).newCondition());
}
synchronized (taskGroupLock.get(name)) {
while (true) {
if (isValid(taskId)) {
break; // Go!!;
} else {
try {
taskGroupCondition.get(name).wait(); // blocked if it's not allowed;
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
}
}
}
}

public static void deque(String name, String taskId) {
if (taskGroup.containsKey(name) && taskGroup.get(name).contains(taskId)) {
synchronized (THE_QUEUE_LOCK) {
taskGroup.get(name).remove(taskId);
if (taskGroup.get(name).isEmpty()) {
taskGroup.remove(name);
}
synchronized (taskGroupLock.get(name)) {
taskGroupCondition.get(name).notifyAll();
}
}
}
}

}

目前,尽管我检查了所有其他任务(至少大多数)都已正确阻止,但只会执行第一个任务。

但是当我检查taskGroupCondition.get(name)时,firstWaiterlastWaiter都是null

我在这里错过了什么?

任何帮助将不胜感激。

最佳答案

如果我理解正确的话,您问的是以下问题:

  1. 并行运行的线程来自 MyTaskQueue开始运行的权限(通过 enque 方法)。
  2. enque方法MyTaskQueue阻塞,直到请求运行的任务获得资格。
  3. 每个合格的线程声明 MyTaskQueue通过调用 deque 它已结束运行方法。
  4. deque方法通知所有其他任务,以便检查哪些任务符合条件,然后开始运行。

然后我可以看到以下解决方案:

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;

public class MyTaskQueue {

private final Map<String, Set<String>> runningTasks;
private String qualifiedTaskId;

public MyTaskQueue(String initialQualifiedTaskId) {
runningTasks = new HashMap<>();
qualifiedTaskId = initialQualifiedTaskId;
}

private synchronized boolean isValid(String taskId) {
return qualifiedTaskId != null && taskId != null && taskId.equals(qualifiedTaskId); //Do your qualification tests here...
}

public synchronized void setQualifiedTaskId(String qualifiedTaskId) {
this.qualifiedTaskId = qualifiedTaskId;
notifyAll(); //Now that the qualification test changed, is time to notify every blocked task.
//This way, all new qualified tasks will also be started. This "notifyAll()" operation is optional.
}

public synchronized void enque(String task, String taskId) {
while (!isValid(taskId)) { //Reentrant lock.
System.out.println("Blocking unqualified task {\"" + task + "\", \"" + taskId + "\"}...");
try { wait(); } catch (InterruptedException ie) { /*Handle the exception...*/ }
}
runningTasks.putIfAbsent(task, new HashSet<>());
runningTasks.get(task).add(taskId);
System.out.println("Starting qualified task {\"" + task + "\", \"" + taskId + "\"}...");
}

//Optional method. Might be needed for example if a Thread
//wants to check if another task is currently running...
public synchronized boolean isRunning(String task, String taskId) {
return runningTasks.containsKey(task) && runningTasks.get(task).contains(taskId);
}

public synchronized void deque(String task, String taskId) {
if (isRunning(task, taskId)) { //Reentrant lock.

//Cleanup:
runningTasks.get(task).remove(taskId);
if (runningTasks.get(task).isEmpty())
runningTasks.remove(task);

//Notify all blocked tasks:
notifyAll();
}
}

public static void main(final String[] args) {
MyTaskQueue q = new MyTaskQueue("qualified");
Random rand = new Random();
new MyThread(q, "Task1", "qualified222", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task2", "qualified222", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task3", "qualified", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task4", "qualified", 2500 + rand.nextInt(500)).start();
new MyThread(q, "Task5", "foreverBlocked", 2500 + rand.nextInt(500)).start();
try { Thread.sleep(3000); } catch (InterruptedException ie) { /*Handle the exception...*/ }
synchronized (q) {
System.out.println("Qualifying tasks of id \"qualified222\"...");
q.setQualifiedTaskId("qualified222"); //Reentrant lock.
}
//Execution of main method never ends, because of the forever blocked task "Task5".
//The "Task5" still runs while waiting for permission... See MyThread for details...
}
}

然后是MyThread :

public class MyThread extends Thread {
private final String task, taskId;
private final int actionTime; //Dummy uptime to simulate.
private final MyTaskQueue q;

public MyThread(MyTaskQueue q, String task, String taskId, int actionTime) {
this.q = q;
this.task = task;
this.taskId = taskId;
this.actionTime = actionTime;
}

@Override
public void run() {
q.enque(task, taskId); //Wait for permission to run...
System.out.println("Task {\"" + task + "\", \"" + taskId + "\"} is currently running...");

//Now lets actually execute the task of the Thread:
try { Thread.sleep(actionTime); } catch (InterruptedException ie) { /*Handle the exception.*/ }

q.deque(task, taskId); //Declare Thread ended.
}
}

MyThread是执行所需实际操作的类。

为简单起见,我认为如果任务 id 等于变量(即 qualifiedTaskId ),则该任务是合格的。

还有一个main测试代码的方法。

遵循示例输出(并且我对行进行了编号):

  1. 阻止不合格任务 {"Task1", "qualified222"}…
  2. 阻止不合格的任务 {"Task5", "blocked"}...
  3. 正在启动合格任务 {"Task4", "qualified"}...
  4. 任务 {"Task4", "qualified"} 当前正在运行...
  5. 正在启动合格任务 {"Task3", "qualified"}...
  6. 任务 {"Task3", "qualified"} 当前正在运行...
  7. 阻止不合格任务 {"Task2", "qualified222"}…
  8. 阻止不合格任务 {"Task2", "qualified222"}…
  9. 阻止不合格的任务 {"Task5", "blocked"}...
  10. 阻止不合格任务 {"Task1", "qualified222"}…
  11. 阻止不合格任务 {"Task1", "qualified222"}…
  12. 阻止不合格的任务 {"Task5", "blocked"}...
  13. 阻止不合格任务 {"Task2", "qualified222"}…
  14. ID为“qualified222”的合格任务...
  15. 正在启动合格任务 {"Task2", "qualified222"}...
  16. 任务 {"Task2", "qualified222"} 当前正在运行...
  17. 阻止不合格的任务 {"Task5", "blocked"}...
  18. 正在启动合格任务 {"Task1", "qualified222"}...
  19. 任务 {"Task1", "qualified222"} 当前正在运行...
  20. 阻止不合格的任务 {"Task5", "blocked"}...
  21. 阻止不合格的任务 {"Task5", "blocked"}…

如您所见,第 1 行到第 7 行是每个线程的初始消息。
然后,第 8 行到第 10 行被调用,因为合格的任务结束了(因此它们被重新阻塞)。
然后,第 11 到 13 行被调用,因为另一个合格的任务结束了(因此它们被重新阻塞)。
然后,在第 14 至 19 行,资格测试发生变化,新的合格任务开始运行。还有一个任务( Task5 )尚未合格。
最后,由于任务 id 等于 "qualified222" 的合格任务,因此调用第 20 至 21 行。结束。

关于java - 如何正确使用Condition来等待和notifyAll而不丢失通知?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51088940/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com