gpt4 book ai didi

java - 向 ArrayBlockingQueue 添加功能

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:07:15 26 4
gpt4 key购买 nike

我正在尝试向 ArrayBlockingQueue 添加功能,特别是我希望队列只保留唯一元素,即如果条目已包含在队列中,则不将其加入队列。由于我想要的功能与 JCIP 的第 4.4 项中的 Vector 扩展相同,因此我尝试使用那里的方法实现它。

  • 通过扩展实现是行不通的,因为 ArrayBlockingQueue 使用包私有(private) ReentrantLock 实现它的互斥,所以作为扩展类我无法获得对它的引用。即使它确实有效,这也是一种脆弱的方法。
  • 客户端锁定的实现不起作用,因为没有客户端锁定支持。
  • 一开始似乎是通过组合实现,生成如下代码

    public class DistinctBlockingQueue<E> implements BlockingQueue<E> {
    private final BlockingQueue<E> backingQueue;

    public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
    this.backingQueue = backingQueue;
    }

    @Override
    public synchronized boolean offer(E e) {
    if (backingQueue.contains(e)) {
    return false;
    }

    return backingQueue.offer(e);
    }

    @Override
    public synchronized E take() throws InterruptedException {
    return backingQueue.take();
    }

    // Other methods...
    }

    不幸的是,在组成 ArrayBlockingQueue 时,这种方法会在以下简单场景中产生死锁:

    1. 线程A调用take()获取同步锁和ArrayBlockingQueue的内锁。
    2. 线程 A 在看到队列为空时阻塞并释放 ArrayBlockingQueue 的内部锁。
    3. 线程 B 使用元素调用 offer() 但无法获取同步锁,永远阻塞。

我的问题是,如何在不重写 ArrayBlockingQueue 的情况下实现这个功能?

最佳答案

也许一种简单快速的解决方案是使用 java.util.concurrent.ConcurrentMap :

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DistinctBlockingQueue<E> implements BlockingQueue<E> {

private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();

public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}

@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
return add[0] && backingQueue.offer(e);
}

@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}

// Other methods

}

请注意,不需要同步

编辑:

java.util.concurrent.ConcurrentHashMap 的文档说:

/**
* If the specified key is not already associated with a value,
* attempts to compute its value using the given mapping function
* and enters it into this map unless {@code null}. The entire
* method invocation is performed atomically, so the function is
* applied at most once per key. Some attempted update operations
* on this map by other threads may be blocked while computation
* is in progress, so the computation should be short and simple,
* and must not attempt to update any other mappings of this map.
*
* @param key key with which the specified value is to be associated
* @param mappingFunction the function to compute a value
* @return the current (existing or computed) value associated with
* the specified key, or null if the computed value is null
* @throws NullPointerException if the specified key or mappingFunction
* is null
* @throws IllegalStateException if the computation detectably
* attempts a recursive update to this map that would
* otherwise never complete
* @throws RuntimeException or Error if the mappingFunction does so,
* in which case the mapping is left unestablished
*/
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
...
}

我添加了一些额外的检查:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class DistinctBlockingQueue<E> implements BlockingQueue<E> {

private final BlockingQueue<E> backingQueue;
private final ConcurrentMap<E, Boolean> elements = new ConcurrentHashMap<>();

public DistinctBlockingQueue(BlockingQueue<E> backingQueue) {
this.backingQueue = backingQueue;
}

@Override
public boolean offer(E e) {
boolean[] add = {false};
elements.computeIfAbsent(e, k -> add[0] = true);
if (add[0]) {
// make sure that the element was added to the queue,
// otherwise we must remove it from the map
if (backingQueue.offer(e)) {
return true;
}
elements.remove(e);
}
return false;
}

@Override
public E take() throws InterruptedException {
E e = backingQueue.take();
elements.remove(e);
return e;
}

@Override
public String toString() {
return backingQueue.toString();
}

// Other methods

}

然后...让我们做一些并发测试:

BlockingQueue<String> queue = new DistinctBlockingQueue<>(new ArrayBlockingQueue<>(100));

int n = 1000;
ExecutorService producerService = Executors.newFixedThreadPool(n);

Callable<Void> producer = () -> {
queue.offer("a");
return null;
};

producerService.invokeAll(IntStream.range(0, n).mapToObj(i -> producer).collect(Collectors.toList()));
producerService.shutdown();

System.out.println(queue); // prints [a]

关于java - 向 ArrayBlockingQueue 添加功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37383859/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com