gpt4 book ai didi

Java 在两个 ConcurrentMap 上发生一致的线程 View

转载 作者:太空宇宙 更新时间:2023-11-04 13:15:40 27 4
gpt4 key购买 nike

我有一个java类来处理多线程订阅服务。通过实现 Subscribable 接口(interface),可以将任务提交到服务并定期执行。代码草图如下所示:

import java.util.concurrent.*;

public class Subscribtions {

private ConcurrentMap<Subscribable, Future<?>> futures = new ConcurrentHashMap<Subscribable, Future<?>>();
private ConcurrentMap<Subscribable, Integer> cacheFutures = new ConcurrentHashMap<Subscribable, Integer>();
private ScheduledExecutorService threads;

public Subscribtions() {
threads = Executors.newScheduledThreadPool(16);
}

public void subscribe(Subscribable subscription) {
Runnable runnable = getThread(subscription);
Future<?> future = threads.scheduleAtFixedRate(runnable, subscription.getInitialDelay(), subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}

/*
* Only called from controller thread
*/
public void unsubscribe(Subscribable subscription) {
Future<?> future = futures.remove(subscription); //1. Might be removed by worker thread
if (future != null)
future.cancel(false);
else {
//3. Worker-thread view := cacheFutures.put() -> futures.remove()
//4. Controller-thread has seen futures.remove(), but has it seen cacheFutures.put()?
}
}

/*
* Only called from worker threads
*/
private void delay(Runnable runnable, Subscribable subscription, long delay) {
cacheFutures.put(subscription, 0); //2. Which is why it is cached first
Future<?> currentFuture = futures.remove(subscription);
if (currentFuture != null) {
currentFuture.cancel(false);
Future<?> future = threads.scheduleAtFixedRate(runnable, delay, subscription.getPeriod(), TimeUnit.SECONDS);
futures.put(subscription, future);
}
}

private Runnable getThread(Subscribable subscription) {
return new Runnable() {
public void run() {
//Do work...
boolean someCondition = true;
long someDelay = 100;
if (someCondition) {
delay(this, subscription, someDelay);
}
}
};
}

public interface Subscribable {
long getInitialDelay();
long getPeriod();
}
}

所以该类允许:

  • 订阅新任务
  • 取消订阅现有任务
  • 延迟定期执行的任务

订阅由外部控制线程添加/删除,但延迟仅由内部工作线程引起。如果例如工作线程没有发现上次执行的更新,或者例如如果线程只需要从 00.00 - 23.00 执行。

我的问题是工作线程可能会调用delay()并将其 future 从ConcurrentMap中删除,而 Controller 线程可能会同时调用unsubscribe()。然后,如果 Controller 线程在工作线程放入新的 future 之前检查 ConcurrentMap,则 unsubscribe() 调用将会丢失。

有一些(可能不是详尽的列表)解决方案:

  • delay()unsubscribe() 方法之间使用锁
  • 与上面相同,但每次订阅一把锁
  • (首选?)不使用锁,但“缓存”删除了 delay() 方法中的 future

对于第三种方案,既然worker-thread已经建立了happens-before关系cacheFutures.put() -> futures.remove(),并且ConcurrentMap的原子性使得controller线程看到了futures.remove(),那么它是否也看到了和worker线程一样的happens-before关系呢? IE。 cacheFutures.put() -> futures.remove()?或者原子性仅适用于 futures 映射,并更新稍后传播的其他变量?

也欢迎任何其他评论,尤其是。考虑使用 volatile 关键字。缓存映射是否应该声明为 volatile 的?谢谢!

最佳答案

每个订阅一个锁将要求您维护另一个映射,并可能因此引入额外的并发问题。我认为最好避免这种情况。这同样适用于缓存已删除的订阅,而且这会带来不必要的资源保留的额外风险(请注意,您需要缓存的不是 Future 本身,而是与它们关联的 Subscribable)。

无论如何,您都需要某种同步/锁定。例如,在选项 (3) 中,您需要避免在 delay() 缓存该订阅和删除其 Future 之间发生给定订阅的 unsubscribe()。在没有某种形式的锁定的情况下,避免这种情况的唯一方法是,每个订阅只使用一个 Future,从 subscribe() 注册它起一直保持在适当的位置,直到它被 unsubscribe() 删除。这样做与延迟已安排的订阅的能力不一致。

As for the third solution, since the worker-thread has established the happens-before relationship cacheFutures.put() -> futures.remove(), and the atomicity of ConcurrentMap makes the controller thread see futures.remove(), does it also see the same happens-before relationship as the worker thread?

Happens-before 是程序执行过程中 Action 之间的关系。它不特定于任何一个线程的执行 View 。

Or does the atomicity only hold for the futures map with updates to other variables being propagated later?

Controller 线程将始终看到由 delay() 调用执行的 cacheFutures.put() 发生在由同一调用执行的 futures.remove() 之前。不过,我认为这对你没有帮助。

Should the cache-map be declared volatile?

没有。这毫无用处,因为尽管该 map 的内容发生了变化,但 map 本身始终是同一个对象,并且对它的引用不会改变。

您可以考虑让 subscribe()delay()unsubscribe() 分别在所呈现的 Subscribable 上进行同步。这不是我理解的每个订阅都有锁的意思,但它是相似的。它将避免需要单独的数据结构来维护此类锁。我想如果您想避免显式同步,您也可以在 Subscribable 接口(interface)中构建锁定方法。

关于Java 在两个 ConcurrentMap 上发生一致的线程 View ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33555545/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com