- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable
1、 一个由数组支持的有界阻塞队列;
2、 队列按FIFO(先进先出)原则对元素进行排序;
3、 试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞;
4、 支持对等待的生产者线程和使用者线程进行排序的可选公平策略默认情况下,不保证是这种排序然而,通过将公平性(fairness)设置为true而构造的队列允许按照FIFO顺序访问线程公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”;
成员变量
/** 队列元素存储在此数组中 */
private final E[] items;
/** 下一个被take or poll的元素的index */
private int takeIndex;
/** 下一个被put, offer, or add的元素的index */
private int putIndex;
/** 队列中元素数 */
private int count;
/** 可重入的互斥锁 Lock */
private final ReentrantLock lock;
/** 标识队列是否为空,即获取元素的等待条件 */
private final Condition notEmpty;
/** 标识队列是否已满 ,即放入元素的等待条件*/
private final Condition notFull;
构造方法
/**
* 创建一个带有给定的(固定)容量和默认访问策略的ArrayBlockingQueue。
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue。
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];//实例化指定容量数组
lock = new ReentrantLock(fair);//实例化指定策略的重入锁(公平或非公平)
//实例化判断是否非空的 Condition 实例
notEmpty = lock.newCondition();
//实例化判断是否已满的 Condition 实例
notFull = lock.newCondition();
}
/**
* 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue。
* 它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
常用方法
boolean add(E e):将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。
public boolean add(E e) {
//调用的AbstractQueue.add方法,实际在父类调用了本类的offer方法
return super.add(e);
}
boolean offer(E e): 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
public boolean offer(E e) {
//判断元素是否为空
if (e == null) throw new NullPointerException();
//获取当前锁
final ReentrantLock lock = this.lock;
//获取锁(若未获得,会一直去获取,直到取到锁为止)
lock.lock();
try {
//判断当前数组中元素数和数组长度是否相等
//相等,说明数组已满,无法添加新元素,直接返回false
if (count == items.length)
return false;
else {
//数组未满,添加新元素
insert(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
/**
* 插入元素
*/
private void insert(E x) {
items[putIndex] = x;//将元素放入到队尾
putIndex = inc(putIndex);//计算下一个待插入元素的index
++count;//更新当前元素数
notEmpty.signal();//唤醒take相关的等待线程。
}
/**
* 计算下一个待插入元素的index
*/
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
boolean remove(Object o): 从此队列中移除指定元素的单个实例(如果存在)。
public boolean remove(Object o) {
if (o == null) return false;//元素非空判断
final E[] items = this.items;//获取当前元素组
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
try {
int i = takeIndex;//获取下一个被take的索引
int k = 0;
for (;;) {
if (k++ >= count)//循环次数大于实际元素数,退出循环
return false;
if (o.equals(items[i])) {//判断是否是被移除的元素
removeAt(i);//移除元素
return true;
}
i = inc(i);//计算下一个被放入的元素
}
} finally {
lock.unlock();//释放锁
}
}
/**
* 移除指定index的元素
* 若移除最后一个元素,tabkeIndex重置为0
* 若移除0-[count-1]的任意元素,索引i后面的元素依次前移
*/
void removeAt(int i) {
final E[] items = this.items;//获取当前元素组
// if removing front item, just advance
if (i == takeIndex) {//刚刚移除队尾元素
items[takeIndex] = null;//元素置空
takeIndex = inc(takeIndex);//元素全部移除,takeIndex为0
} else {
// slide over all others up through putIndex.
for (;;) {
int nexti = inc(i);//被移除元素的下一个元素的index
if (nexti != putIndex) {//下一个元素待插入的index
items[i] = items[nexti];//所以i后面的元素前移
i = nexti;
} else {
items[i] = null;//队尾元素置空
putIndex = i;//更新实际下一个add的index为先前的队尾
break;
}
}
}
--count;//更新实际元素数
notFull.signal();//唤醒add相关的等待线程。
}
Epoll():获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞,获取锁
try {
if (count == 0)//队列为空,不需要移除
return null;
E x = extract();//移除元素
return x;//返回被移除的元素
} finally {
lock.unlock();//释放锁
}
}
/**
* 移除并返回被移除的元素,获取到锁的前提下,调用此方法
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];//获取下一个takeIndex的元素
items[takeIndex] = null;//元素置空
takeIndex = inc(takeIndex);//计算下一个takeIndex
--count;//更新元素数
notFull.signal();//唤醒add相关线程
return x;//返回被移除的元素
}
Epeek():获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞,获取锁
try {
//无元素,返回空,否则返回下一个待获取的元素
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();//释放锁
}
}
void put(E e):将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//非空校验
final E[] items = this.items;//获取当前元素组
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞获取锁,但响应中断
try {
try {
while (count == items.length)//队列已满
notFull.await();//是否已满进行等待
} catch (InterruptedException ie) {
notFull.signal(); // 唤醒没有中断的线程
throw ie;//抛出线程中断异常
}
insert(e);//队列未满,插入元素
} finally {
lock.unlock();//释放锁
}
}
Etake():获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞时获取锁,但响应中断
try {
try {
while (count == 0)
notEmpty.await();//队列为空,进行等待
} catch (InterruptedException ie) {
notEmpty.signal(); // 唤醒非中断线程
throw ie;
}
E x = extract();//移除元素
return x;//并返回被移除的元素
} finally {
lock.unlock();//释放锁
}
}
由源码可知,ArrayBlockingQueue利用可重入的互斥锁ReentrantLock保证线程安全,并且利用Condition进行阻塞式take或者put。poll或offer支持两种操作限制:(Condition.await() 一直阻塞等待, awaitNanos(long nanosTimeout) 有时间限制的阻塞等待)
public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable
1、 一个基于已链接节点的、范围任意的blockingqueue;
2、 队列按FIFO(先进先出)排序元素;
3、 链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低;
4、 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法如果未指定容量,则它等于Integer.MAX_VALUE除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点;
成员变量
static class Node<E> {//静态内部类
/**
* 当前元素
*/
E item;
/**
* 当前元素的下一个元素
*/
Node<E> next;
Node(E x) { item = x; }
}
/** 链表队列最大容量 */
private final int capacity;
/** 当前元素数 */
private final AtomicInteger count = new AtomicInteger(0);
/** 链表头节点 */
private transient Node<E> head;
/** 链表尾节点 */
private transient Node<E> last;
/** 针对take相关的可重复入的互斥锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** take等待条件 */
private final Condition notEmpty = takeLock.newCondition();
/** 针对put, offer相关的可重复入的互斥锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** put等待条件 */
private final Condition notFull = putLock.newCondition();
构造方法
/**
* 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();//判断容量值是否合法
this.capacity = capacity;//更新容量最大值
last = head = new Node<E>(null);设置头尾节点为空元素
}
/**
* 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 一直阻塞,直到获取锁
try {
int n = 0;
for (E e : c) {//加强循环c
if (e == null)//元素不能为null
throw new NullPointerException();
if (n == capacity)//队列已满
throw new IllegalStateException("Queue full");
enqueue(e);//将元素放入队列
++n;//元素数+1
}
count.set(n);//设置当前元素数
} finally {
putLock.unlock();//释放锁
}
}
/**
* 将元素插入到链表尾部
*/
private void enqueue(E x) {
last = last.next = new Node<E>(x);
}
常用方法
boolean offer(E e):将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();//非空检查
final AtomicInteger count = this.count;//获取当前元素数
if (count.get() == capacity)//判断元素数是否已到阀值
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;
putLock.lock();//获取add锁
try {
if (count.get() < capacity) {//判断元素数是否已到阀值
enqueue(e);//插入元素到链表尾部
c = count.getAndIncrement();//获取add前元素数,并且更新元素数
if (c + 1 < capacity)//元素数没到阀值,唤醒add相关线程
notFull.signal();
}
} finally {
putLock.unlock();//释放锁
}
if (c == 0)//说明元素已插入,唤醒take相关线程
signalNotEmpty();
return c >= 0;//返回是否已插入成功
}
/**
* 唤醒非空的线程
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//阻塞式获取take锁
try {
notEmpty.signal();//唤醒take相关线程
} finally {
takeLock.unlock();//释放锁
}
}
Epeek():获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
if (count.get() == 0)//判断队列中是否已无元素
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//阻塞式获取take锁
try {
Node<E> first = head.next;//获取头节点的下一个元素
if (first == null)//
return null;
else
return first.item;//返回
} finally {
takeLock.unlock();
}
}
void put(E e):将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();//非空判断
int c = -1;//判断是否添加成功
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;//获取当前元素数
putLock.lockInterruptibly();//获取add锁,可响应线程中断
try {
while (count.get() == capacity) { //链表队列已满,线程等待
notFull.await();
}
enqueue(e);//插入元素到链表尾部
c = count.getAndIncrement();//获取插入前的元素数,并+1
if (c + 1 < capacity)//插入后未超阀值,唤醒add相关线程
notFull.signal();
} finally {
putLock.unlock();//释放锁
}
if (c == 0)
signalNotEmpty();//说明已插入元素,唤醒take相关线程
}
Epoll():获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)//判断队列中是否还有元素
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();//阻塞式获取take锁
try {
if (count.get() > 0) {
x = dequeue();//获取并移除元素
c = count.getAndDecrement();//更新元素数
if (c > 1)
notEmpty.signal();//说明队列中还有元素,唤醒take相关线程
}
} finally {
takeLock.unlock();//释放锁
}
if (c == capacity)
signalNotFull();//移除了一个元素,说明队列未满,唤醒add线程
return x;//返回被移除的元素
}
/**
* 从队列头部移除元素(实际头部元素为:head.next)
*/
private E dequeue() {
Node<E> h = head;//获取头节点
//获取头节点的下一个元素(实际是头节点,因为head.item始终为null)
Node<E> first = h.next;
h.next = h; // help GC(此时head.next指向自身,相当于head.next元素未被引用,便于垃圾回收)
head = first;// 将下一个元素前移
E x = first.item;//获取实际头节点元素
first.item = null;//头节点元素置空(即head.item=null)
return x;//返回头节点元素
}
Etake():取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//阻塞式获取take锁,可响应线程中断
try {
while (count.get() == 0) {//队列中无元素,线程等待
notEmpty.await();
}
x = dequeue();//获取并移除元素
c = count.getAndDecrement();//更新元素数,返回移除前元素数
if (c > 1)
notEmpty.signal();//队列中还有元素,唤醒take相关线程
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
在看源码时,出现过疑惑,即head和last是怎么以链表形式连接起来的。此处要求大家需要区分基本数据类型和引用类型的区别。
具体截取的部分核心源码
//内部类 元素以nodex形式在链表中
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//构造LinkedBlockingQueue实例时,都会执行下面这行代码
last = head = new Node<E>(null); //ps:此时last和head的引用地址是相同的。
//上面代码可分解非下面三行:
Node<E> node0 = new Node<E>(null);
head = node0;
last = head;
//插入类方法最终会调用下面这 代码:
last = last.next = new Node<E>(x);
//可拆分为下面三行:
Node<E> node1 = new Node<E>(x);
last.next = node1;//该行执行完毕时,last.next为node1,同时 head.next也为node1;引用地址的特性,
last = last.next;//此时改变last引用地址,head与last引用地址不相同,且last.next为null;
// head=node0 但值有变化
// add前:head.item=null,head.next=node0;
// add后:head.item=null,head.next=node1;
// last= node0变为:last= node1
// add前:last.item=null,last.next=node0;
// add后:last.item=x,last.next=null;
//由此每次add,通过node.next构成链表式队列。head.item始终为null
//接下来再看看出队(FIFO),从队头开始
public E dequeue() {
Node<E> h = head;//获取头节点
Node<E> first = h.next;
h.next = h; // 此时相当于h.next指向自身,帮助GC回收node0即头节点
head = first;// 将下一个元素前移
E x = first.item;//获取实际头节点元素
first.item = null;//头节点元素置空(即head.item=null)
return x;//返回头节点元素
}
由此看出,LinkedBlockingQueue利用两个锁ReentrantLock(takeLock、putLock)、两个Condition(notEmpty、notFull)进行读写分离,提高效率,利用Node构成链表式队列。
我试图弄清楚以下模块正在做什么。 import Queue import multiprocessing import threading class BufferedReadQueue(Queue.
如果我使用 Queue.Queue,那么我的 read() 函数不起作用,为什么?但是,如果我使用 multiprocessing.Queue,它运行良好: from multiprocessing
我正在寻找比我在文档中找到的更多关于 Python 队列实现的见解。 根据我的理解,如果我在这方面有误,请原谅我的无知: queue.Queue():通过内存中的基本数组实现,因此不能在多个进程之间共
当我使用多处理模块(Windows 上的 Python 2.7)中的队列代替 Queue.Queue 时,我的程序没有完全关闭。 最终,我想使用 multiprocessing.Process 处理
阅读了大量的 JavaScript 事件循环教程,我看到了不同的术语来标识队列存储消息,当调用堆栈为空时,事件循环准备好获取消息: 队列 消息队列 事件队列 我找不到规范的术语来识别它。 甚至 MDN
我收到错误消息“类型队列不接受参数”。当我将更改队列行替换为 PriorityQueue 时,此错误消失并且编译正常。有什么区别以及如何将其更改为编译队列和常规队列? import java.util
如何将项目返回到 queue.Queue?如果任务失败,这在线程或多处理中很有用,这样任务就不会丢失。 docs for queue.Queue.get()说函数可以“从队列中删除并返回一个项目”,但
如何在多个 queue.Queue 上进行“选择”同时? Golang 有 desired feature及其 channel : select { case i1 = 声明。 线程:queue 模
http://docs.python.org/2/library/queue.html#Queue.Queue.put 这似乎是一个幼稚的问题,但我在文档和谷歌搜索中都没有找到答案,那么这些方法是线程
这可能是个愚蠢的问题,但我对与 .dequeue() 和 $.queue() 一起使用的 .queue() 感到困惑> 或 jquery.queue()。 它们是否相同,如果是,为什么 jquery
我正在尝试创建一个线程化的 tcp 流处理程序类线程和主线程对话,但是 Queue.Queue 也没有做我需要的,服务器从另一个程序接收数据,我只想传递它进入主线程进行处理这里是我到目前为止的代码:
The principal challenge of multi-threaded applications is coordinating threads that share data or ot
在Queue模块的queue类中,有几个方法,分别是qsize、empty 和 full,其文档声称它们“不可靠”。 他们到底有什么不可靠的地方? 我确实注意到 on the Python docs网
我需要一个队列,多个线程可以将内容放入其中,并且多个线程可以从中读取。 Python 至少有两个队列类,Queue.Queue 和 collections.deque,前者似乎在内部使用后者。两者都在
明天我将介绍我选择进程内消息队列实现的基本原理,但我无法阐明我的推理。我的合作设计者提议我们实现一个简单的异步队列,只使用基本的作业列表和互斥锁来控制访问,我建议在嵌入式模式下使用 ActiveMQ。
在 scala 中定义了一个特征: trait Queue[T] Queue 是一种类型吗?或其他东西,例如类型构造函数? 来自 http://artima.com/pins1ed/type-para
我看到 SML/NJ 包含一个队列结构。我不知道如何使用它。如何使用 SML/NJ 提供的附加库? 最佳答案 Queue structure SML '97 未指定,但它存在于 SML/NJ 的顶级环
我是 D3 和 JavaScript 的新手。 我试图理解其中的 queue.js。 我已经完成了 this关联。但是仍然无法清楚地了解 queue.await() 和 queue.awaitAll(
所以我试图在我的 main.cpp 文件中调用一个函数,但我得到“错误:没有匹配函数来调用‘Queue::Queue()。” 队列.h #ifndef QUEUE_H #define QUEUE_H
假设我有一个 10 行的二维 numpy 数组 例如 array([[ 23425. , 521331.40625], [ 23465. , 521246.03125],
我是一名优秀的程序员,十分优秀!