gpt4 book ai didi

java - 线程安全发布者

转载 作者:行者123 更新时间:2023-11-30 03:50:20 25 4
gpt4 key购买 nike

我遇到了一个 Java 并发问题,我以为我已经解决了,但现在我觉得我的解决方案有问题。

问题:

以线程安全且高性能的方式实现缺失的方法。系统应仅订阅每个 T key 的第一个请求,并应在给定键不再有监听器后取消订阅。

interface Listener {
void onData();
}

abstract class Publisher<T> {
public void subscribe(T key, Listener l) {
// TODO complete
}

public void unsubscribe(T key, Listener l) {
// TODO complete
}

public void publish(T key) {
// TODO complete
}

public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}

我的解决方案是将 key 和监听器存储在 ConcurrentHashMap 中,并使用线程池执行器来运行对非常长的订阅和取消订阅方法的调用:

abstract class Publisher<T> {
private final int POOL_SIZE = 5;

private final ConcurrentHashMap<T, List<Listener>> listeners = new ConcurrentHashMap<>();
private final ScheduledExecutorService stpe = Executors.newScheduledThreadPool(POOL_SIZE);

public void subscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
listeners.get(key).add(l);
} else {
final T keyToAdd = key;
final List<Listener> list = new LinkedList<Listener>();
list.add(l);

Runnable r = new Runnable() {
public void run() {
reallyLongSubscribeRequest(keyToAdd);
listeners.putIfAbsent(keyToAdd, list);
}
};

stpe.execute(r);
}
}

public void unsubscribe(T key, Listener l) {
if (listeners.containsKey(key)) {
List<Listener> list = listeners.get(key);
list.remove(l);

if (list.size() == 0) {
final T keyToRemove = key;
Runnable r = new Runnable() {
public void run() {
reallyLongUnsubscribeRequest(keyToRemove);
}
};
stpe.execute(r);
}
}
}

public void publish(T key) {
if (listeners.containsKey(key)) {
final List<Listener> list = listeners.get(key);
for (Listener l : list) {
l.onData();
}
}
}

public abstract void reallyLongSubscribeRequest(T key);
public abstract void reallyLongUnsubscribeRequest(T key);
}

我现在担心这不再是线程安全的,因为

  1. 订阅中, Activity 线程可以被换出/在进入错误分支和执行 Runnable 之间结束其时间片。如果下一个线程进行相同的调用(相同的键),那么我们将有两个线程想要订阅并写入映射。 putIfAbsent 保持映射一致,但非常长方法将被调用两次(如果它更改了类状态,这很糟糕)。

  2. 与 #1 类似,在 unssubscribe 中,如果线程在进入嵌套 if 的 true 分支和执行 Runnable 之间被换出会怎样?

所以我的问题是

  1. 我的上述担忧是否有效,还是我让问题变得过于复杂(或者我是否误解了时间切片的工作原理)?
  2. 如果是,这个问题可以轻松解决吗?或者有更好/更容易/更简单的解决方案吗?

最佳答案

你有两个问题:

  • 您的订阅取消订阅发布方法应该同步以使其线程安全。

  • 您应该只有一个线程来执行等待队列reallyLong...()调用。您向 Queue 发布一条消息,告知要执行其中一项操作,并且它确实执行了。队列将确保它们一个接一个地发生。

您的代码中也有一个错误。仅当映射中不存在该键时才执行reallyLongSubscribeRequest(...),但在删除最后一个监听器时不会从映射中删除该键。

关于java - 线程安全发布者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24633241/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com