gpt4 book ai didi

java - 服务中的 parking 线

转载 作者:行者123 更新时间:2023-12-03 12:54:12 26 4
gpt4 key购买 nike

我正在尝试线程驻留,并决定构建某种服务。看起来是这样的:

public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;

public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}

public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}

private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}

我面临的问题是,如果我随后按以下方式测试此服务,请执行以下操作:
public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}

有时我会出现以下行为:
14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)

该线程在 parking 场上闲逛:
"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new$0(TestService.java:27)
at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)

我在服务中看不到公园与公园之间的任何比赛。此外,如果在 unpark之前调用 park,则保证 park不会阻塞(这就是javadocs所说的)。

也许我滥用 LockSupport::park。您能提出任何建议吗?

最佳答案

这与记录器无关,尽管它的用法使问题浮出水面。就这么简单,您就有了比赛条件。在解释竞争条件之前,您需要首先了解LockSupport::unpark文档中的一些知识:

Makes available the permit for the given thread, if it was not already available. If the thread was blocked on park then it will unblock. Otherwise, its next call to park is guaranteed not to block.



第一点是解释 here。简短的版本是:如果您已经启动了 thread,但尚未调用 park,并且在这段时间内(线程的 startpark之间),则其他一些线程在第一个线程上调用 unpark:该线程根本不会停放。许可证将立即可用。可能是这张小图使它更加清晰:
(ThreadA)  start ------------------ park --------- ....

(ThreadB) start ----- unpark -----

注意 ThreadBunpark(ThreadA)调用 ThreadAstart的时间段之间如何调用 park。这样,当 ThreadA到达 park: 时,可以保证不会像文档中所述那样阻止

同一文档的第二点是:

This operation is not guaranteed to have any effect at all if the given thread has not been started.



让我们通过绘图来看看:
Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 
ThreadA调用 park后,它将永远挂起,因为 ThreadB再也不会在其上调用 unpark了。注意,对 unpark的调用是在 ThreadA开始之前进行的(与前面的示例不同)。

这正是您的情况:

LockSupport.unpark(w);调用 unparkWorkers之前先调用 t.start();(来自 public void start(){...})。用简单的话来说-您的代码甚至在开始使用之前就在这两个 unpark上调用 workers,因此,当它们最终到达 park时-他们被卡住了,没有人能够对它们进行 unpark。您看到的是 logger而不是 System::out的事实,最有可能与您使用 println时的表情有关–引擎盖下有一个 synchronized方法。

事实上, LockSupport恰好提供了证明这一点的语义。为此,我们需要(为简单起见: SOProblem service = new SOProblem(1);)
static class ParkBlocker {

private volatile int x;

public ParkBlocker(int x) {
this.x = x;
}

public int getX() {
return x;
}
}

现在我们需要将其插入适当的方法中。首先标记我们已调用 unpark的事实:
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}

然后在循环结束后重置标志:
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}

然后将构造函数更改为标记线程已启动:
  .....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}

然后,当线程冻结时,您需要检查标志:
 public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}
debug方法在哪里:
public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}

当问题重现时,您先调用 unpark,然后再调用 park,这是在将 x = 3作为输出时发生的。

关于java - 服务中的 parking 线,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50370522/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com