gpt4 book ai didi

java - 如果取消时 runnable 正在进行中,如何取消 ScheduledFuture 并等待 runnable 停止?

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:10:52 26 4
gpt4 key购买 nike

当任何命令在任何 ScheduledExecutorService 上以固定速率调度时,它返回 ScheduledFuture 也可以取消。但是“cancel”并不能保证命令在 cancel 返回后仍未执行,例如因为调用“cancel”时命令已经在执行中。

对于大多数用例来说,它的功能就足够了。但是我已经处理了在取消后需要阻塞当前线程的用例,如果命令已经在进行中,并且等到命令完成。换句话说,如果命令仍在执行,则调用取消的线程不应继续执行。用 mayInterruptIfRunning=true 取消也不合适,因为我不想破坏当前的执行,我只需要等待正常完成。

我没有找到如何通过标准 JDK 类实现此要求。 问题1:我错了吗,这种功能真的存在吗?

于是决定自己实现: 导入 java.util.concurrent.*;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture {

/**
* @return the scheduled future with method special implementation of "cancel" method,
* which in additional to standard implementation,
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns
*/
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
CancellableCommand cancellableCommand = new CancellableCommand(command);
ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand);
}

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) {
this.targetFuture = targetFuture;
this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
runnable.cancel();
return targetFuture.cancel(mayInterruptIfRunning);
}

@Override
public long getDelay(TimeUnit unit) {
return targetFuture.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return targetFuture.compareTo(o);
}

@Override
public boolean isCancelled() {
return targetFuture.isCancelled();
}

@Override
public boolean isDone() {
return targetFuture.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return targetFuture.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return targetFuture.get(timeout, unit);
}

private static class CancellableCommand implements Runnable {

private final Object monitor = new Object();
private final Runnable target;
private boolean cancelled = false;

private CancellableCommand(Runnable target) {
this.target = target;
}

public void cancel() {
synchronized (monitor) {
cancelled = true;
}
}

@Override
public void run() {
synchronized (monitor) {
if (!cancelled) {
target.run();
}
}
}

}

}

问题 2:有人能找到上面代码中的错误吗?

最佳答案

Question2: Could anybody find errors in the code above?

存在假设的死锁,可以通过以下场景重现:

  1. 有持有监视器M1的线程T1
  2. 计划任务正在线程T2上执行(持有其监视器M2)并想进入M1,因此T2需要等待直到T1退出监视器M1。
  3. T1 决定取消任务,但由于它的监视器 M2 被任务本身锁定,我们陷入了死锁。

最有可能的场景 abovr 是不真实的,但为了避免所有可能的情况,我决定以无锁方式重写代码:

public class GracefullyStoppingScheduledFuture {

/**
* @return the scheduled future with method special implementation of "cancel" method,
* which in additional to standard implementation,
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns
*/
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
CancellableCommand cancellableCommand = new CancellableCommand(command);
ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
return new GracefullyStoppingScheduledFuture(future, cancellableCommand);
}

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) {
this.targetFuture = targetFuture;
this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException {
try {
targetFuture.cancel(mayInterruptIfRunning);
} finally {
runnable.cancel();
}
}

private static class CancellableCommand implements Runnable {

private static final int NOT_EXECUTING = 0;
private static final int IN_PROGRESS = 1;
private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2;
private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3;

private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING);
private final AtomicReference<Thread> executionThread = new AtomicReference<>();
private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
private final Runnable target;

private CancellableCommand(Runnable target) {
this.target = target;
}

public void cancel() throws ExecutionException, InterruptedException {
if (executionThread.get() == Thread.currentThread()) {
// cancel method was called from target by itself
state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS);
return;
}
while (true) {
if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
return;
}
if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) {
cancellationFuture.get();
return;
}
if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) {
return;
}
if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) {
cancellationFuture.get();
return;
}
}
}

@Override
public void run() {
if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) {
notifyWaiters();
return;
}

try {
executionThread.set(Thread.currentThread());
target.run();
} finally {
executionThread.set(null);
if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) {
notifyWaiters();
}
}
}

private void notifyWaiters() {
if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
// no need to notify anything
return;
}
// someone waits for cancelling
cancellationFuture.complete(null);
return;
}

}

关于java - 如果取消时 runnable 正在进行中,如何取消 ScheduledFuture 并等待 runnable 停止?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45383977/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com