gpt4 book ai didi

java - 有没有一种好方法将 FutureLocal.java 添加到扩展 CompletableFuture 的自定义 Future.java 中? (下面的示例代码)

转载 作者:行者123 更新时间:2023-12-01 16:28:29 28 4
gpt4 key购买 nike

我有以下代码,除了当我调用 super.thenCompose 时,它​​有点接近,它返回一个 CompletableFuture 而不是我的 Custom Future.java,这很关键。我正在尝试复制 twitter 的 scala futures

  1. 能够像 twitter scala 的 futures 一样添加取消链
  2. 可以让请求上下文流通过 thenApply 和 thenCompose 链来修复 slf4j 中的 MDC(很像 ThreadLocal,但它会在每个 lambda 运行之前重新应用,如下面的代码所示)

    公共(public)类 Future 扩展了 CompletableFuture {

    @Override
    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    Map<String, Object> state = FutureLocal.fetchState();
    MyFunction f = new MyFunction(state, fn);

    return super.thenApply(f);
    }

    @Override
    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
    Map<String, Object> state = FutureLocal.fetchState();
    MyFunction f = new MyFunction(state, fn);

    return super.thenCompose(f);
    }

    @SuppressWarnings("hiding")
    private class MyFunction implements Function {

    private Map<String, Object> state;
    private Function fn;

    public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) {
    this.state = state;
    this.fn = fn;

    }

    @Override
    public Object apply(Object t) {

    try {
    FutureLocal.restoreState(state);

    return fn.apply(t);

    } finally {
    FutureLocal.restoreState(null);
    }


    }

    }

    @Override
    public boolean complete(T value) {
    return super.complete(value);
    }

    @Override
    public boolean completeExceptionally(Throwable ex) {
    return super.completeExceptionally(ex);
    }

    }

这是我用来运行该代码的一些代码,但在 map 中记录“测试”在第三次远程调用时开始失败,这意味着 slf4j MDC 将崩溃。

public class TestCustomFutures {

private Executor exec = Executors.newFixedThreadPool(3);

@Test
public void testFutureContext() throws InterruptedException, ExecutionException {

Set<Integer> hashSet = new HashSet<Integer>();

FutureLocal.put("test", 100);

CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));

f.get();
}

private Future<Integer> myRemoteCall(int i) {
System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());

Future<Integer> f = new Future<Integer>();

exec.execute(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}

f.complete(i);
}
});

return f;
}
}

然后输出是这样的

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

注意最后一个值我们不希望为空

最佳答案

啊啊,我错过了一件简单的事情,因为我在 jdk8 中。然而在 jdk11 中,你可以覆盖这个...

@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}

在 jdk8 中,由于某种原因,这不会编译,也不会调用这个:(。糟糕,我还不想升级到 11,因为 jdk8 上仍然有一些用法:(。

关于java - 有没有一种好方法将 FutureLocal.java 添加到扩展 CompletableFuture 的自定义 Future.java 中? (下面的示例代码),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62109484/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com