- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想过滤掉第一个 CompletableFuture
之后的重复项,然后使用另一个 CompletableFuture
调用第二阶段。我尝试过的:
@FunctionalInterface
public interface FunctionWithExceptions<T, R, E extends Exception> {
R process(T t) throws E;
}
public static <T> Predicate<T> distinctByKey(FunctionWithExceptions<? super T, ?, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> {
String key = "";
try {
key = (String) keyExtractor.process(t);
} catch (Exception e) {
log.info("Get instanceIp failed!");
}
return seen.add(key);
};
}
List<CompletableFuture<InstanceDo>> instanceFutures = podNames.stream()
.map(podName -> CompletableFuture.supplyAsync(RethrowExceptionUtil.rethrowSupplier(() -> {
PodDo podDo = getPodRetriever().getPod(envId, podName);
podDoList.add(podDo);
return podDo;
}), executor))
.map(future -> future.thenApply(podDo -> podDo.getInstanceName()))
.filter(distinctByKey(CompletableFuture::get))
.map(future -> future.thenCompose(instanceName ->
CompletableFuture.supplyAsync(() -> get(envId, instanceName), executor)))
.collect(Collectors.toList());
如您所见,distinctByKey
将调用get
,这将直接使并发变为顺序。
我应该怎样做才能再次并发,同时保留独特功能?
或
我只有一个选择?
等待整个第一阶段完成然后开始第二阶段?
最佳答案
我只是写了一个简单的演示来解决此类问题,但我真的不知道它是否可靠。但至少它确保可以使用Set<Object> seen = ConcurrentHashMap.newKeySet();
加速第二阶段。 。
public static void main(String... args) throws ExecutionException, InterruptedException {
Set<Object> seen = ConcurrentHashMap.newKeySet();
List<CompletableFuture<Integer>> intFutures = Stream.iterate(0, i -> i+1)
.limit(5)
.map(i -> CompletableFuture.supplyAsync(() -> {
int a = runStage1(i);
if (seen.add(a)) {
return a;
} else {
return -1;
}}))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> {
if (i > 0) {
return runStage2(i);
} else {
return i;
}})))
.collect(Collectors.toList());
List<Integer> resultList = new ArrayList<>();
try {
for (CompletableFuture<Integer> future: intFutures) {
resultList.add(future.join());
}
} catch (Exception ignored) {
ignored.printStackTrace();
out.println("Future failed!");
}
resultList.stream().forEach(out::println);
}
private static Integer runStage1(int a) {
out.println("stage - 1: " + a);
try {
Thread.sleep(500 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(a % 3);
}
private static Integer runStage2(int b) {
out.println("stage - 2: " + b);
try {
Thread.sleep(200 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(b);
}
通过在第一阶段当重复时返回特殊值,然后在第二阶段,通过特殊值(-1),我可以忽略耗时的第二阶段计算。
输出确实过滤掉了第二阶段的一些冗余计算。
stage - 1: 0
stage - 1: 1
stage - 1: 2
stage - 1: 3
stage - 2: 2 //
stage - 2: 1 //
stage - 1: 4
0
1
2
-1
-1
我认为这不是一个好的解决方案。但我可以优化什么来让它变得更好呢?
关于java - 过滤掉 CompletableFuture 的重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50464051/
我的应用程序中有 3 种不同的方法。全部返回CompletableFuture .我想并行执行方法 1 和 2。完成方法 1 和 2 后,我想使用方法 1 和 2 返回值的参数触发方法 3。 代码示例
我正在尝试加快对多个 API 的调用。 在下面的代码中,getFilteredEvents是当前同步版本。我有这样的感觉 map(x -> x.getFilteredEvents(eventResea
Q1。我的理解是,如果 future 正常或异常完成,就会调用 completableFuture.handle 。但是超时情况又如何呢? 第二季度。在哪里检查 completableFuture 的
我不明白这里发生了什么 CompletableFuture/supplyAsync。 如果我从先前实例化的 CompletableFuture 对象调用 supplyAsync 方法,它永远不会完成:
我有一个返回 CompletableFuture 的异步方法。 private CompletableFuture asyncA(..) 我公开了一个必须返回 CompleteableFuture 的
我在项目中找到了这段代码: int threadCount = 10; CompletableFuture[] futures = new CompletableFuture[threadCount]
让我们举个例子:我们有四种方法: CompletableFututre loadAndApply(SomeObject someObject); CompletableFuture loadData(
我有一个可完成的 future (future1),它创建了 10 个可完成的 future (futureN)。只有当所有 futureN 都完成时,有没有办法将 future1 设置为完成? 最佳
我想编写一个返回 CompletableFuture 的异步方法. future 的唯一目的是跟踪方法何时完成,而不是其结果。返回CompletableFuture会更好吗?或 Completable
我正在对我的数据库进行多次异步调用。我将所有这些异步调用存储在 List> list 上.我想一起收集所有结果,所以我需要等待所有这些调用完成。 一种方法是创建一个 CompletableFuture
我正在尝试使用 CompletableFuture 链接一些文件处理程序,它应该返回 CompletableFuture : CompletableFuture allGen = loadFile1(
我有 2 个 CompletableFuture。 task2 只能在 task1 完成后启动。然后,我需要等待所有任务完成。在下面的代码中,程序在 task1 结束后结束。 task2 开始但未完成
我有 2 个 CompletableFuture。 task2 只能在 task1 完成后启动。然后,我需要等待所有任务完成。在下面的代码中,程序在 task1 结束后结束。 task2 开始但未完成
我很难弄清楚这一点,并且可以向那些比我更有经验和知识的人寻求帮助。 基本问题是我需要获取对象列表,然后对于返回的每个对象,获取一些详细信息,并将详细信息缝合到对象中。我希望在这方面保持高效;我需要首先
我目前正在使用 CompletableFuture supplyAsync() 方法将一些任务提交到公共(public)线程池。这是代码片段的样子: final List>> completableF
为什么是CompletableFuture.allOf声明为 CompletableFuture而不是返回结果集合或其他东西?我认为制作 CompletableFuture.anyOf 是个好主意返回
我没有看到处理具有异步结果的异常的明显方法。 例如,如果我想重试一个异步操作,我会期待这样的事情: CompletionStage cf = askPong("cause error").handle
比如我有这样的方法: public CompletableFuture getPage(int i) { ... } public CompletableFuture getDocument(
我正在调用一个返回 CompletableFuture 的服务。 输出结构如下。 Class Output { public String name; public Integer a
我正在尝试转换 List>至CompletableFuture> .当您有许多异步任务并且需要获取所有这些任务的结果时,这非常有用。 如果其中任何一个失败,那么最终的 future 将失败。这就是我的
我是一名优秀的程序员,十分优秀!