gpt4 book ai didi

java - 固定和动态调整工作线程的数量

转载 作者:行者123 更新时间:2023-12-02 06:26:22 24 4
gpt4 key购买 nike

我目前正在实现一个数据容器/数据结构,它具有 addRecord(final Redcord record) 和 close() 方法。

public class Container{

Container() {
}

public void addRecord(final Record record){
// do something with the record
// add the modified it to the container
}

public void close(){

}
}

由于为了将记录存储到容器中,需要完成几个步骤,因此我想在最终将它们添加到容器之前将这些步骤外包给线程。

我的问题是,我不希望任何容器实例使用超过 4 个线程,并且如果我同时打开多个容器实例,则总共不应使用超过 12 个线程。除此之外,我希望一旦创建了第 4 个容器,其他 3 个打开的容器中的每一个都应该丢失其中一个线程,这样所有 4 个容器都可以各使用 3 个线程。

我正在研究 ThreadPools 和 ExecutorServices。虽然设置大小为 12 的线程池很简单,但在使用 ExecutorServices 时很难将使用的线程数限制为 4。回想一下,我最多需要 12 个线程,这就是为什么我有一个在容器实例之间共享的大小为 12 的静态实例。

这就是我开始的

public class Container{

public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);

final List<Future<Entry>> _fRecords = new LinkedList<>();

Container() {
}

public void addRecord(final Record record){
_fRecords.add(SERVICE.submit(new Callable<Entry>{

@Override
public Entry call() throws Exception {
return record.createEntry();
}
}));
}

public void close() throws Exception {
for(final Future<Entry> f) {
f.get(); // and add the entry to the container
}
}
}

显然,这不能确保单个实例最多使用 4 个线程。此外,如果另一个容器需要几个线程,我不知道如何强制这些 Executorservices 关闭工作线程或重新分配它们。

问题:

  1. 由于Redord本身是一个接口(interface),并且createEntry()的运行时间取决于实际的实现,所以我想确保不同的容器不使用相同的线程/工作线程,即我不希望任何线程都为两个不同的容器执行工作。这背后的想法是,我不希望仅处理“长时间运行”记录的容器实例被仅处理“短期运行记录”的容器减慢。如果这在概念上没有意义(对你来说),则不需要为不同的容器提供隔离的线程/工作线程。

  2. 实现我自己的 ExecutorService 是正确的方法吗?有人可以向我指出一个具有类似问题/解决方案的项目吗?否则我想我必须实现自己的 ExecutorService 并在我的容器之间共享它,或者有更好的解决方案吗?

  3. 目前,我正在 close 方法中收集所有 future,因为容器必须按照记录/条目到达的顺序存储它们。显然,这需要大量时间,因此我想在 Future 完成后立即执行此操作。让额外的工作人员单独处理 future 是否有意义,或者让我的工作人员完成这项工作(即进行线程交互)更好。

最佳答案

我不认为 Java 运行时提供的东西可以满足您开箱即用的需求。

我编写了自己的类来满足此类需求(公共(public)池+给定队列的限制)。

public static class ThreadLimitedQueue {
private final ExecutorService executorService;
private final int limit;

private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;

public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}

public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress++;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}

private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null || inProgress > limit) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}

我的情况唯一的区别limit是常量,但您可以修改它。例如,您可以替换 int limitSupplier<Integer> limitFunction并且该函数必须提供动态限制,例如

Supplier<Integer> limitFunction = 12 / containersList.size();

只是让它更健壮(例如,如果containersList为空或超过12该怎么办)

关于java - 固定和动态调整工作线程的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55784995/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com