gpt4 book ai didi

java - 如何在没有队列的情况下实现此并发结构?

转载 作者:行者123 更新时间:2023-12-03 13:07:39 25 4
gpt4 key购买 nike

我的应用程序中遇到一种情况,事件进入并且处理事件的线程(信令线程)必须向另一个线程(工作线程)发出信号,到目前为止,该线程处于空闲状态,它可以运行一些代码。工作线程完成后,应等待再次发出信号。当工作线程正在工作时,事件可能会到达。在这种情况下,它应该继续运行并立即工作。工作线程执行的一项操作足以处理任何数量的传入事件,因此无需为每个事件进行一次操作,而只需在每个事件后尽快进行一次操作即可。正确行为示例:

event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work

4个项目,3个工作周期。不幸的是,但不可避免的要求信令线程在处理事件时不能阻塞。目前,我已经使用BlockingQueue实现了此功能,即使内容没有意思甚至没有看,它也具有填满自身的毫无意义的副作用。我原本希望能够使用CountDownLatch或CyclicBarrier或类似工具来完成这项工作,但我一直找不到方法。这是我的实现:
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

private static final class MyBarrier {
private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
void await() throws InterruptedException {
queue.take();
queue.clear();
}
void signal() {
queue.add(true);
}
}

private static Random random = new Random(0);

private static void sleepForMax(int maxMillis) {
sleep(random.nextInt(maxMillis));
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public static void main(String[] args) {
MyBarrier myBarrier = new MyBarrier();
final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
singallingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
sleepForMax(1_000); // simulate period between events arriving
myBarrier.signal();
System.out.println("Signalling work to be done");
}
System.out.println("Thread interrupted");
});
final ExecutorService workingThread = Executors.newSingleThreadExecutor();
workingThread.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println("Waiting for work");
myBarrier.await();
} catch (InterruptedException e) {
break;
}
System.out.println("Doing work...");
sleepForMax(3_000); // simulate work being done
System.out.println("Work done");
}
System.out.println("Thread interrupted");
});
sleep(10_000);
singallingThread.shutdownNow();
workingThread.shutdownNow();
}

}

有什么更好的方法来做到这一点?

最佳答案

当我在使用Phaser的实现中运行您的代码时,更改了 sleep 时间,以便每800毫秒发生一次信号发送,而处理则需要1000毫秒,我得到了此输出:

00008: Waiting for work
00808: Signalling work to be done
00808: Doing work... <-- worker starts working
01608: Signalling work to be done <-- signal came, so there's more work
01808: Work done
01809: Waiting for work <-- waits for work...
02409: Signalling work to be done <-- ...for 600 ms, until the next signal
02409: Doing work...

(左边的数字是从开始算起的毫秒数。此外,您可以在代码中随机延迟地重现它,但这在这里很难重现和看到。)

如果我正确理解,那是错误的。例如。想象一下,如果信号不再到来会发生什么。

您的代码可能可以针对您的具体情况进行此调整:
private static final class MyBarrierWithPhaser {

private final Phaser phaser = new Phaser(1);
private int lastObservedPhase; // Phaser has initial phase 0

void await() throws InterruptedException {
// only works for 1 producer 1 worker; lastObservedPhase is kind of thread-local
lastObservedPhase = phaser.awaitAdvanceInterruptibly(lastObservedPhase);
}

void signal() {
phaser.arrive();
}
}

这样,worker记录了它前进到的最后一个阶段,如果信号线程在下一个 awaitAdvanceInterruptibly之前“到达”,那么Phaser阶段将被更新,并且当worker尝试使用陈旧阶段等待时,它将立即进行;如果信号线程没有在 awaitAdvanceInterruptibly之前到达,则工作线程将等待直到信号线程最终到达。

使用更简单的同步原语,我可以想到如何使用 synchronized- wait()- notify()机制来实现它:
private static final class MyBarrierWithSynchronized {

private boolean hasWork = false;

synchronized void await() throws InterruptedException {
while (!hasWork) {
wait();
}
hasWork = false;
}

synchronized void signal() {
hasWork = true;
notifyAll(); // or notify() if we are sure there is 1 signal thread and 1 worker thread
}
}

它有两个缺点:如果线程正在等待输入 await(),它将不会被中断。另外,有些人不喜欢在 this上进行同步,因此为了简短起见,我将其保留下来。可以使用 java.util.concurrent.*类似物重写此代码,此实现不会同时具有以下两个缺点:
private static final class MyBarrierWithLock {

private boolean hasWorkFlag = false;

private final Lock lock = new ReentrantLock();
private final Condition hasWorkCond = lock.newCondition();

void await() throws InterruptedException {
lock.lockInterruptibly();
try {
while (!hasWorkFlag) {
hasWorkCond.await();
}
hasWorkFlag = false;
} finally {
lock.unlock();
}
}

void signal() {
lock.lock();
try {
hasWorkFlag = true;
hasWorkCond.signalAll(); // or signal() if we are sure there is 1 signal thread and 1 worker thread
} finally {
lock.unlock();
}
}
}

关于java - 如何在没有队列的情况下实现此并发结构?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58829675/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com