gpt4 book ai didi

java - 如何将 CompletableFuture.supplyAsync 与 PriorityBlockingQueue 一起使用?

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:08:40 24 4
gpt4 key购买 nike

我正在尝试通过 CompletableFuture.supplyAsync 将优先级队列添加到将 ThreadPoolExecutor 与 LinkedBlockingQueue 结合使用的现有应用程序。问题是我无法想出一个设计来分配任务优先级,然后我可以在 PriorityBlockingQueue 的比较器中访问它。这是因为我的任务被 CompletableFuture 包装到一个名为 AsyncSupply 的私有(private)内部类的实例中,该实例将原始任务隐藏在私有(private)字段中。然后使用这些转换为 Runnable 的 AsyncSupply 对象调用 Comparator,如下所示:

public class PriorityComparator<T extends Runnable> implements Comparator<T> {

@Override
public int compare(T o1, T o2) {

// T is an AsyncSupply object.
// BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
return 0;
}
}

我调查了扩展 CompletableFuture 的可能性,以便我可以将它包装在不同的对象中,但是 CompletableFuture 的大部分内容都是封装的并且不可继承。因此,扩展它似乎不是一种选择。也没有使用适配器封装它,因为它实现了一个非常广泛的接口(interface)。

除了复制整个 CompletableFuture 并修改它之外,我不确定如何解决这个问题。有什么想法吗?

最佳答案

CompletableFuture 没有提供使用 PriorityBlockingQueue 的直接方法,这似乎是 API 中的一个限制。幸运的是,我们可以毫不费力地破解它。在 Oracle 的 1.8 JVM 中,它们恰好将所有内部类的字段命名为 fn,因此提取我们的优先级感知 Runnable 可以毫不费力地完成:

public class CFRunnableComparator implements Comparator<Runnable> {

@Override
@SuppressWarnings("unchecked")
public int compare(Runnable r1, Runnable r2) {
// T might be AsyncSupply, UniApply, etc., but we want to
// compare our original Runnables.
return ((Comparable) unwrap(r1)).compareTo(unwrap(r2));
}

private Object unwrap(Runnable r) {
try {
Field field = r.getClass().getDeclaredField("fn");
field.setAccessible(true);
// NB: For performance-intensive contexts, you may want to
// cache these in a ConcurrentHashMap<Class<?>, Field>.
return field.get(r);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new IllegalArgumentException("Couldn't unwrap " + r, e);
}
}
}

这假设您的 Supplier 类是 Comparable,类似于:

public interface WithPriority extends Comparable<WithPriority> {
int priority();
@Override
default int compareTo(WithPriority o) {
// Reverse comparison so higher priority comes first.
return Integer.compare(o.priority(), priority());
}
}

public class PrioritySupplier<T> implements Supplier<T>, WithPriority {
private final int priority;
private final Supplier<T> supplier;
public PrioritySupplier(int priority, Supplier<T> supplier) {
this.priority = priority;
this.supplier = supplier;
}
@Override
public T get() {
return supplier.get();
}
@Override
public int priority() {
return priority;
}
}

使用如下:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/,
new CFRunnableComparator());
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q);
CompletableFuture.supplyAsync(new PrioritySupplier<>(n, () -> {
...
}), pool);

如果您创建像 PriorityFunctionPriorityBiConsumer 这样的类,您可以使用相同的技术来调用像 thenApplyAsyncwhenCompleteAsync< 这样的方法 以及适当的优先级。

关于java - 如何将 CompletableFuture.supplyAsync 与 PriorityBlockingQueue 一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34866757/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com