gpt4 book ai didi

java - 具有最小和最大阈值设置的池填充服务

转载 作者:行者123 更新时间:2023-11-30 10:58:13 25 4
gpt4 key购买 nike

我想创建一个包含某些对象的池(无限)的应用程序。当池的大小达到某个最小阈值时,我想启动我的池填充服务并生成一定数量的新对象(直到达到最大阈值)。

这是 producer-consumer 问题的略微修改版本,但不知何故我卡住了。由于创建大小有限的 BlockingQueue 并保持其填充很容易,我不知道如何解决我的问题。

我尝试使用 ReentrantLock 并且它是 Condition 对象,但是 Condition#signal() 方法要求我在锁内,这完全是对我来说是不必要的。

在我看来,最好的解决方案是 CountDownLatch。消费者会减少计数器并最终触发池填充服务。这里的问题是 CountDownLatch 无法自行重启。

有什么想法吗?

换句话说:我有一堆消费者线程和一个生产者线程。生产者应该等到达到最小阈值,生产一些对象,然后再次等待。

最佳答案

Semaphore 可以作为生产者的屏障并且可以重复使用。当 SemaphoreAtomicBoolean 组合时,生产者可以在不影响消费者的情况下工作。它确实需要池来处理填充逻辑。
在下面的实现中,生产者立即开始填充池,然后等待池达到其最小大小。

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

// http://stackoverflow.com/q/32358084/3080094
public class RandomWell {

public static void main(String[] args) {

try {

final FilledPool<Integer> pool = new FilledPool<Integer>(100, 1000);
final CountDownLatch syncStart = new CountDownLatch(3);

Thread consumer = new Thread() {
@Override public void run() {
// just to do something, keep track of amount of positive ints from pool
int positiveInt = 0;
int totalInts = 0;
try {
syncStart.countDown();
syncStart.await();
for(;;) {
int i = pool.take();
if (i > 0) {
positiveInt++;
}
totalInts++;
Thread.yield();
}
} catch (InterruptedException e) {
System.out.println("Consumer stopped: " + positiveInt + " / " + (totalInts - positiveInt));
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumer.start();

Thread producer = new Thread() {
@Override public void run() {
try {
Random r = new Random();
syncStart.countDown();
syncStart.await();
for(;;) {
int fillTotal = 0;
while (!pool.isMinFilled()) {
int fill = pool.getFillSize();
for (int i = 0; i < fill; i++) {
pool.offer(r.nextInt());
}
fillTotal += fill;
// System.out.println("Pool size: " + pool.sizeFast());
}
System.out.println("Filled " + fillTotal);
pool.awaitNewFilling();
}
} catch (InterruptedException e) {
System.out.println("Producer stopped.");
} catch (Exception e) {
e.printStackTrace();
}
}
};
producer.start();
syncStart.countDown();
syncStart.await();

Thread.sleep(100);

producer.interrupt();
consumer.interrupt();

} catch (Exception e) {
e.printStackTrace();
}
}

static class FilledPool<E> {

private final LinkedBlockingQueue<E> pool;
private final int minSize;
private final int maxSize;
private final Semaphore needFilling = new Semaphore(0);

// producer starts filling initially
private final AtomicBoolean filling = new AtomicBoolean(true);

public FilledPool(int minSize, int maxSize) {
super();
this.minSize = minSize;
this.maxSize = maxSize;
pool = new LinkedBlockingQueue<E>();
}

public E take() throws InterruptedException {

triggerFilling();
E e = pool.take();
return e;
}

private void triggerFilling() {

if (!isFilling() && !isMinFilled() && filling.compareAndSet(false, true)) {
needFilling.release();
System.out.println("Filling triggered.");
}
}

public void offer(E e) { pool.offer(e); }

public void awaitNewFilling() throws InterruptedException {

// must check on minimum in case consumers outpace producer
if (isMinFilled()) {
filling.set(false);
needFilling.acquire();
}
}

public int size() { return pool.size(); }
public boolean isMinFilled() { return minSize < size(); }
public int getFillSize() { return maxSize - size(); }
public boolean isFilling() { return filling.get(); }
}
}

更新:我还设法使用 ConcurrentLinkedQueue 而不是 LinkedBlockingQueue 让它工作,这大约使吞吐量增加了一倍,但同时也使代码的复杂性增加了一倍。

关于java - 具有最小和最大阈值设置的池填充服务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32358084/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com