gpt4 book ai didi

java - 过滤掉 CompletableFuture 的重复项

转载 作者:太空宇宙 更新时间:2023-11-04 10:26:39 24 4
gpt4 key购买 nike

我想过滤掉第一个 CompletableFuture 之后的重复项,然后使用另一个 CompletableFuture 调用第二阶段。我尝试过的:

@FunctionalInterface
public interface FunctionWithExceptions<T, R, E extends Exception> {
R process(T t) throws E;
}


public static <T> Predicate<T> distinctByKey(FunctionWithExceptions<? super T, ?, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> {
String key = "";
try {
key = (String) keyExtractor.process(t);
} catch (Exception e) {
log.info("Get instanceIp failed!");
}
return seen.add(key);
};
}

List<CompletableFuture<InstanceDo>> instanceFutures = podNames.stream()
.map(podName -> CompletableFuture.supplyAsync(RethrowExceptionUtil.rethrowSupplier(() -> {
PodDo podDo = getPodRetriever().getPod(envId, podName);
podDoList.add(podDo);
return podDo;
}), executor))
.map(future -> future.thenApply(podDo -> podDo.getInstanceName()))
.filter(distinctByKey(CompletableFuture::get))
.map(future -> future.thenCompose(instanceName ->
CompletableFuture.supplyAsync(() -> get(envId, instanceName), executor)))
.collect(Collectors.toList());

如您所见,distinctByKey调用get,这将直接使并发变为顺序

我应该怎样做才能再次并发,同时保留独特功能?

我只有一个选择?

等待整个第一阶段完成然后开始第二阶段

最佳答案

我只是写了一个简单的演示来解决此类问题,但我真的知道它是否可靠。但至少它确保可以使用Set<Object> seen = ConcurrentHashMap.newKeySet();加速第二阶段。 。

public static void main(String... args) throws ExecutionException, InterruptedException {
Set<Object> seen = ConcurrentHashMap.newKeySet();
List<CompletableFuture<Integer>> intFutures = Stream.iterate(0, i -> i+1)
.limit(5)
.map(i -> CompletableFuture.supplyAsync(() -> {
int a = runStage1(i);
if (seen.add(a)) {
return a;
} else {
return -1;
}}))
.map(future -> future.thenCompose(i -> CompletableFuture.supplyAsync(() -> {
if (i > 0) {
return runStage2(i);
} else {
return i;
}})))
.collect(Collectors.toList());
List<Integer> resultList = new ArrayList<>();
try {
for (CompletableFuture<Integer> future: intFutures) {
resultList.add(future.join());
}
} catch (Exception ignored) {
ignored.printStackTrace();
out.println("Future failed!");
}
resultList.stream().forEach(out::println);
}

private static Integer runStage1(int a) {
out.println("stage - 1: " + a);
try {
Thread.sleep(500 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(a % 3);
}

private static Integer runStage2(int b) {
out.println("stage - 2: " + b);
try {
Thread.sleep(200 + Math.abs(new Random().nextInt()) % 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

return Integer.valueOf(b);
}

通过在第一阶段当重复时返回特殊值,然后在第二阶段,通过特殊值(-1),我可以忽略耗时的第二阶段计算。

输出确实过滤掉了第二阶段的一些冗余计算。

stage - 1: 0
stage - 1: 1
stage - 1: 2
stage - 1: 3
stage - 2: 2 //
stage - 2: 1 //
stage - 1: 4
0
1
2
-1
-1

我认为这不是一个好的解决方案。但我可以优化什么来让它变得更好呢?

关于java - 过滤掉 CompletableFuture 的重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50464051/

24 4 0