gpt4 book ai didi

java - ReentrantLock 线程随机终止

转载 作者:行者123 更新时间:2023-12-03 12:58:39 30 4
gpt4 key购买 nike

我一直在做一个关于 Java 多线程的学校作业。我遇到的一个任务是,我们需要在不同的组中创建多个线程,一旦每个组中有 4 个线程,只有这样才能释放它们协同工作,否则它们必须被搁置/等待。例如:

  • 线程 a,b,c 加入第 7 组,它们都被搁置/等待。
  • 线程 d 加入组 7,所有四个线程 (a,b,c,d) 都被通知要终止。
  • 线程 e,f,g,h,i 加入组 8,在这种情况下,e,f,g,h 将在线程 i 等待时发出终止信号。
  • 线程j加入第7组,等待。

  • 这是我完成的一般任务。我正在处理的任务要求我们释放一组的 INITIAL 前 4 个线程,其余的应该等到前面的 4 个线程调用完成()。

    例如,3 个线程加入组 65,它们被置于等待状态。另一个线程加入组 65,所有 4 个线程一起被释放。现在 4 个线程正在工作(终止)。现在线程 e,f,g,h,i,j,k,l 加入组 65。所有线程都被放置等待直到 e,f,g,h 调用了finished()方法。

    这是我到目前为止所做的:

    ExtrinsicSync.java:
    import java.util.HashMap;
    import java.util.concurrent.locks.ReentrantLock;

    public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
    groupThreadCount = new HashMap<>();
    monitor = new ReentrantLock();
    }

    @Override
    public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
    groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
    condWrapper.getCondition().signalAll();
    condWrapper.setInitialStatus(false);

    System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
    System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
    try { condWrapper.getCondition().await(); }
    catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
    }

    @Override
    public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
    condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
    System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
    if(condWrapper.getFinishedCount() == 4)
    {
    condWrapper.setFinishedCount(0);
    condWrapper.getCondition().signalAll();
    System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
    }
    }
    monitor.unlock();
    }

    ExtrinsicSyncTest.java:
    import org.junit.Test;

    import java.util.EnumMap;

    class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
    this.group = group;
    this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
    }

    public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

    int nThreads = 22;

    Thread t[] = new Thread[nThreads];
    final ExtrinsicSync s1 = new ExtrinsicSync();

    for(int i = 0; i < nThreads/2; i++)
    (t[i] = new Thread(new TestTask1(66, s1))).start();

    for(int i = nThreads/2; i < nThreads; i++)
    (t[i] = new Thread(new TestTask1(70, s1))).start();

    for (Thread ti : t)
    {
    try { ti.join(100); }
    catch (Exception e) { System.out.println(e); }
    }

    EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

    for (Thread.State s : Thread.State.values())
    threadsInThisState.put(s, 0);

    for (Thread ti : t)
    {
    Thread.State state = ti.getState();
    int n = threadsInThisState.get(state);
    threadsInThisState.put(state, n + 1);
    }

    System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
    }

    ConditionWrapper.java:
    import java.util.concurrent.locks.Condition;

    public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
    this.cond = condition;
    this.value = 0;
    this.finishedCount = 0;
    this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
    return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
    return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
    }

    我遇到的问题是我能够释放每个组的前四个线程,但不知何故,有 2 个线程随机终止,我无法弄清楚发生了什么。例如,上面的 22 个线程测试用例分为两组,只有 8 个线程应该被终止,而其余的则等待。

    但是这里有 10 个线程被终止。我不明白发生了什么。我已尽可能将代码精简到最低限度。

    最佳答案

    问题在于,对于非初始线程 (getInitialStatus==false),您不会向其他线程发出信号,但当您达到四个线程时仍会终止它们。所以这就是发生的事情:

  • 前三个线程增加计数并等待
  • 第四个线程达到 count == 4 并设置 initial = false 并向所有其他线程发送信号并将计数设置为零
  • 接下来的三个线程将计数增加一
  • 8 个线程达到计数 == 4 并被终止。由于 getInitialStatus==false 此线程不会通知其他线程。

  • 所以 4*2 个线程 + 2 个线程被终止。正是您在测试中看到的计数。

    这是实现这一点的潜在方法:
  • 在每个线程或任务中使用标志 canExecute
  • 使用方法 calculateState 计算当前状态,如果允许线程执行,则将标志设置为 true。
  • 将所有正在等待的线程存储在列表或类似的东西中

  • 所以你的任务看起来像这样:
    Task
    boolean canExeute

    然后 waitForThreadsInGroup 方法看起来像这样:
    waitForThreadsInGroup
    monitor.lock();
    add task to list
    calculateTaskState
    condition.notifyAll
    while( ! task.canExcecute )
    {
    condition.await.
    }

    monitor.unlock();

    完成方法看起来很相似:
      finish
    monitor.lock();
    decrement finish count
    calculateTaskState
    condition.notifyAll
    monitor.unlock();

    并计算TaskState
    calculateTaskState
    if( finishCount == 0)
    {
    if( taskList.size >= 4 )
    {
    set 4 tasks in this list to can execute and remove them from the list
    }
    }

    所以诀窍是将逻辑分为三个步骤:
  • 操作,例如减少完成计数
  • 新状态的计算。并决定每个线程是否允许执行
  • 以及线程的等待。每个线程都需要等待自己的标志
  • 关于java - ReentrantLock 线程随机终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61054474/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com