- 921. Minimum Add to Make Parentheses Valid 使括号有效的最少添加
- 915. Partition Array into Disjoint Intervals 分割数组
- 932. Beautiful Array 漂亮数组
- 940. Distinct Subsequences II 不同的子序列 II
public class DelayQueue extends AbstractQueue implements BlockingQueue
1、 Delayed元素的一个基于优先级的无界阻塞队列,只有在延迟期满时才能从中提取元素;
2、 如果延迟都还没有期满,则队列没有头部,并且poll将返回null;
3、 不允许使用null元素;
成员变量
/**
* 可重入的互斥锁
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 一个基于优先级堆的无界优先级队列。可自然排序。不允许使用 null 元素。
*/
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
* 唤醒或等待take线程的条件
*/
private final Condition available = lock.newCondition();
构造方法
/**
* 创建一个最初为空的新 DelayQueue。
*/
public DelayQueue() {}
/**
* 创建一个最初包含 Delayed 实例的给定 collection 元素的 DelayQueue。
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
常用方法
boolean add(E e):将指定元素插入此延迟队列中。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):将指定元素插入此延迟队列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
E first = q.peek();//获取但不移除此队列的头部;如果此队列为空,则返回 null。
q.offer(e);//将指定的元素插入此优先级队列。
if (first == null || e.compareTo(first) < 0)
available.signalAll();//队列中无元素,唤醒take线程
return true;
} finally {
lock.unlock();//释放锁
}
}
Epeek():获取但不移除此队列的头部;如果此队列为空,则返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.peek();// 获取但不移除此队列的头;如果此队列为空,则返回 null。
} finally {
lock.unlock();//释放锁
}
}
Epoll(): 获取并移除此队列的头,如果此队列不包含具有已到期延迟时间的元素,则返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();// 获取但不移除此队列的头;如果此队列为空,则返回 null。
//getDelay:返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;//队列为空或元素未到期
else {
E x = q.poll();//获取并移除此队列的头,如果此队列为空,则返回 null。
assert x != null;//非空校验
if (q.size() != 0)//队列中还有元素
available.signalAll();//唤醒take线程
return x;
}
} finally {
lock.unlock();
}
}
void put(E e):将指定元素插入此延迟队列。
public void put(E e) {
offer(e);
}
Etake():获取并移除此队列的头部,在可从此队列获得到期延迟的元素之前一直等待(如有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式获取锁,可响应线程中断
try {
for (;;) {
E first = q.peek();//获取但不移除此队列的头;如果此队列为空,则返回 null。
if (first == null) {
available.await();//队列为空,线程等待,释放锁
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//获取元素剩余到期市场,并判断是否到期
if (delay > 0) {
long tl = available.awaitNanos(delay);//元素还未到期,线程等待指定时间,释放锁
} else {
E x = q.poll();//获取并移除此队列的头,如果此队列为空,则返回 null。
assert x != null;//非空校验
if (q.size() != 0)
available.signalAll(); // 线程非空,唤醒其它所有take线程 return x;//返回队头元素
}
}
}
} finally {
lock.unlock();//释放锁
}
}
由源码看出延迟队列DelayQueue操作元素是通过PriorityQueue实现的,PriorityQueue是一个基于优先级堆的无界优先级队列。利用可重入的互斥锁ReentrantLock保证线程安全,同时利用Condition保证插入或获取元素是阻塞的。
public class PriorityQueue extends AbstractQueue implements java.io.Serializable
1、 一个基于优先级堆的无界优先级队列;
2、 优先级队列不允许使用null元素;
3、 默认容量11,当元素容量小于64时,扩容double,否则扩容50%;
4、 不是同步的;
成员变量
/**
* 初始容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 元素以平衡二叉树形式存储
*/
private transient Object[] queue;
/**
* 优先级队列元素数
*/
private int size = 0;
/**
* 元素自然排序方式
*/
private final Comparator<? super E> comparator;
/**
* 优先级队列修改次数
*/
private transient int modCount = 0;
构造方法
/**
* 使用默认的初始容量(11),并根据其自然顺序对元素进行排序。
*/
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 使用指定的初始容量创建Queue,并根据指定的比较器对元素进行排序。
*/
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
常用方法
boolean add(E e):将指定的元素插入此优先级队列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e): 将指定的元素插入此优先级队列。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;//修改次数+1
int i = size;
if (i >= queue.length)
grow(i + 1);//元素数可能不够,需要扩容
size = i + 1;
if (i == 0)//队列为空
queue[0] = e;
else
siftUp(i, e);
return true;
}
/**
* 队列扩容
*/
private void grow(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
int oldCapacity = queue.length;//获取当前元素数
// <64 双倍扩容; >=64 扩容 50%
int newCapacity = ((oldCapacity < 64)?
((oldCapacity + 1) * 2):
((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;//最大边界
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);//数组复制
}
//选择排序方式并插入相应位置
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
/**
* Comparable 方式排序
*/
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
/**
* Comparator 方式排序
*/
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
Epeek(): 获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
if (size == 0)
return null;
return (E) queue[0];
}
Epoll() :获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
if (size == 0)
return null;
int s = --size;//更新元素数
modCount++;//修改次数+1
E result = (E) queue[0];//获取队列头部元素
E x = (E) queue[s];//获取尾部元素
queue[s] = null;//末尾置空
if (s != 0)//队列中还有元素
siftDown(0, x);
return result;
}
/**
* 元素重新排序
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
/**
* Comparable方式重新排序
*/
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
/**
* Comparator方式重新排序
*/
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
由源码看出,PriorityQueue是非线程安全的,利用Comparator或Comparable进行自然排序,从而实现有优先级的队列,以平衡二叉树的形式存储在transient Object[] queue中,虽然api中介绍说是无界队列,但从源码看出其实是有边界的 ,值为Integer.MAX_VALUE;只是边界特别大,从某种程度上来说,可以理解为无边界。
public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable
1、 一个无界阻塞队列,它使用与类PriorityQueue相同的顺序规则,并且提供了阻塞获取操作;
2、 此队列逻辑上是无界的,但是资源被耗尽时试图执行add操作也将失败(导致OutOfMemoryError);
3、 不允许使用null元素;
4、 不允许插入不可比较的对象;
成员变量
//元素操作均基于PriorityQueue
private final PriorityQueue<E> q;
//可重入的互斥锁,该处采用公平锁
private final ReentrantLock lock = new ReentrantLock(true);
//take条件
private final Condition notEmpty = lock.newCondition();
构造方法
/**
* 用默认的初始容量 (11) 创建一个 PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序。
*/
public PriorityBlockingQueue() {
q = new PriorityQueue<E>();
}
/**
* 使用指定的初始容量创建一个 PriorityBlockingQueue,并根据指定的比较器对其元素进行排序。
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
q = new PriorityQueue<E>(initialCapacity, comparator);
}
/**
* 使用指定的初始容量创建一个 PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序。
*/
public PriorityBlockingQueue(int initialCapacity) {
q = new PriorityQueue<E>(initialCapacity, null);
}
/**
* 创建一个包含指定 collection 元素的 PriorityBlockingQueue。
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
q = new PriorityQueue<E>(c);
}
常用方法
boolean add(E e):将指定元素插入此优先级队列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):将指定元素插入此优先级队列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
boolean ok = q.offer(e);//插入元素
assert ok;
notEmpty.signal();//唤醒take线程
return true;
} finally {
lock.unlock();//释放锁
}
}
Epeek():获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.peek();//获取元素
} finally {
lock.unlock();//释放锁
}
}
Epoll():获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.poll();//获取并移除此队列的头
} finally {
lock.unlock();//释放锁
}
}
Etake():获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式获取锁,可响应线程中断
try {
try {
while (q.size() == 0)
notEmpty.await();//队列中无元素,阻塞等待,释放锁
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();//有元素,获取并移除元素
assert x != null;
return x;//返回元素
} finally {
lock.unlock();//释放锁
}
}
void put(E e):将指定元素插入此优先级队列。
public void put(E e) {
offer(e); // never need to block
}
有源码看出,PriorityBlockingQueue是基于PriorityQueue实现具有优先级的无界阻塞队列,利用ReentrantLock实现线程安全,Condition实现阻塞。
DelayQueue及PriorityBlockingQueue实现具有优先级的无界阻塞队列都是基于PriorityQueue的,区别在于DelayQueue加入了延迟概念。虽说都是无界,但最大边界为:Integer.MAX_VALUE。
我需要一个队列来自动删除早于给定毫秒数的元素 - 基本上,我希望队列中的项目在一段时间后过期。 我看到有一个延迟队列似乎在做相反的事情:“一个元素只能在其延迟到期时被采用。” (我从未使用过它)。 也
在 Java 7 中,DelayQueue 的实现使用了没有公平策略的 ReentrantLock。从长远来看,这是一个问题吗?线程会因此而饿死吗? 谢谢 最佳答案 如果您考虑 ScheduledTh
我遇到了与作者类似的问题: DelayQueue with higher speed remove()? 问题:我需要处理连续传入的数据并检查数据是否在之前的某个时间范围内被看到过。因此,我计算传入数
我有以下据称非常简单的 DelayQueue 演示. class DelayedThing implements Delayed { private final long waitUntil;
我刚开始用 java 编码,我正在努力设置 DelayQueue, 我想这样, DelayQueue queue = new DelayQueue(); If (counter > 0){ queue
我将在模拟 parking 场的程序中使用 Collections 接口(interface)中的 DelayQueue。我想知道在没有元素过期的情况下是否有多个 take 方法调用队列,最后一个 t
下面的 java 代码示例使用 java DelayQueue 来处理任务。然而,从另一个线程插入任务似乎会破坏(我的)预期行为。 很抱歉代码示例太长,但总而言之: 主线程将 5 个任务 (A-E)
我正在使用 LinkedBlockingQueue 队列来实现用于 TCP/IP 事件传输的生产者-消费者模式,我正在使用 boolean offer(e)这意味着一旦队列达到其容量,新传入的事件将被
我正在尝试创建一个 ThreadPoolExecutor: // Thingy implements Delayed and Runnable ExecutorService executor = n
我想要一个DelayQueue计划的 Runnable 的数量,其中每个 Runnable 只能在预先指定的某个时间点之后运行。因此,线程可以继续从该队列中删除可运行对象并处理事件计划。为什么 Del
我想遍历我的 DelayQueue 中未过期的元素。类Transaction实现了Delayed,有一个字段timestamp,代表一笔交易发起时的UTC时间戳(不是当前时间戳) public cla
我是一名优秀的程序员,十分优秀!