gpt4 book ai didi

java - Ratpack 的 Promise.cache 与 ParallelBatch 中的多个下游 promise

转载 作者:行者123 更新时间:2023-11-30 02:08:19 26 4
gpt4 key购买 nike

我遇到了 NullPointerException在使用 Ratpack 的 Promise.cache 时在 Ratpack 的内脏中结合多个下游 promise 和ParallelBatch ,并且从文档中我不清楚我的用法是否不正确,或者这是否代表 Ratpack 中的错误。

这是一个演示问题的简化测试用例:

@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();

for (int i = 0; i < 25; i++) {
Promise<Integer> p = Promise.value(12);
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}

final List<Integer> results = ExecHarness.yieldSingle(c ->
ParallelBatch.of(promises).yield()
).getValueOrThrow();
}

在本地运行此测试 10000 次导致失败率约为 10/10000, NullPointerException看起来像这样:
java.lang.NullPointerException
at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)

不使用 cache在这个测试用例中,问题消失了,因为没有两次订阅每个缓存的 promise 。

我的问题是:这是对 Ratpack API 的错误使用,还是代表框架中的错误?如果是前者,你能指出我在文档中解释为什么这种用法是错误的吗?

最佳答案

即使您的示例不是缓存 Promise 的最佳用例(重新创建和缓存在每个迭代步骤中保持相同值的 Promise 没有多大意义),您实际上在 CachingUpstream<T> 中发现了一个竞争条件错误。类(class)。

我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个值(value) promise 12提供 CachingUpstream<T> 的自定义(更详细)实现目的。我拿走了Promise.value(12)的尸体我已经覆盖了内联方法 cacheResultIf(Predicate<? super ExecResult<T>> shouldCache) 默认返回 CachingUpstream<T>实例:

Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};

接下来我创建了一个类 TestCachingUpstream<T>只需复制原始类(class)的主体,我就添加了一些东西,例如
  • 我每TestCachingUpstream<T>具有内部 ID(随机 UUID)以更轻松地跟踪 promise 的执行。
  • 当 Promise 执行期间发生特定事情时,我添加了一些详细的日志消息。

  • 我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义类如下所示:
    private static class TestCachingUpstream<T> implements Upstream<T> {
    private final String id = UUID.randomUUID().toString();

    private Upstream<? extends T> upstream;

    private final Clock clock;
    private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;

    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
    this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
    this.upstream = upstream;
    this.ttlFunc = ttl;
    this.clock = clock;
    }

    private void tryDrain() {
    if (draining.compareAndSet(false, true)) {
    try {
    TestCachingUpstream.Cached<? extends T> cached = ref.get();
    if (needsFetch(cached)) {
    if (pending.compareAndSet(false, true)) {
    Downstream<? super T> downstream = waiting.poll();

    System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

    if (downstream == null) {
    pending.set(false);
    } else {
    try {
    yield(downstream);
    } catch (Throwable e) {
    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
    receiveResult(downstream, ExecResult.of(Result.error(e)));
    }
    }
    }
    } else {
    System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
    Downstream<? super T> downstream = waiting.poll();
    while (downstream != null) {
    downstream.accept(cached.result);
    downstream = waiting.poll();
    }
    }
    } finally {
    draining.set(false);
    }
    }

    if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
    tryDrain();
    }
    }

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
    System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
    upstream.connect(new Downstream<T>() {
    public void error(Throwable throwable) {
    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
    }

    @Override
    public void success(T value) {
    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
    receiveResult(downstream, ExecResult.of(Result.success(value)));
    }

    @Override
    public void complete() {
    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
    receiveResult(downstream, CompleteExecResult.get());
    }
    });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
    TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
    if (needsFetch(cached)) {
    Promise.<T>async(d -> {
    waiting.add(d);
    tryDrain();
    }).result(downstream::accept);
    } else {
    downstream.accept(cached.result);
    }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
    Duration ttl = Duration.ofSeconds(0);
    try {
    ttl = ttlFunc.apply(result);
    } catch (Throwable e) {
    if (result.isError()) {
    //noinspection ThrowableResultOfMethodCallIgnored
    result.getThrowable().addSuppressed(e);
    } else {
    result = ExecResult.of(Result.error(e));
    }
    }

    Instant expiresAt;
    if (ttl.isNegative()) {
    expiresAt = null; // eternal
    System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
    upstream = null; // release
    } else if (ttl.isZero()) {
    expiresAt = clock.instant().minus(Duration.ofSeconds(1));
    } else {
    expiresAt = clock.instant().plus(ttl);
    }

    ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
    pending.set(false);

    downstream.accept(result);

    tryDrain();
    }

    static class Cached<T> {
    final ExecResult<T> result;
    final Instant expireAt;

    Cached(ExecResult<T> result, Instant expireAt) {
    this.result = result;
    this.expireAt = expireAt;
    }
    }
    }

    我已将 for 循环中的步骤数从 25 减少到 3,以使控制台输出更简洁。

    成功测试执行(无竞争条件)

    让我们看一下正确执行的流程是什么样的:
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...

    正如您所看到的,每次迭代都会导致缓存的 Promise 产生 5 个控制台日志行。
  • tryDrain 第一次调用方法,没有缓存结果,它下降到yield(downstream);方法调用
  • 调用yield(downstream)成功完成和receiveResult(downstream, ExecResult.of(Result.success(value)));从内部调用 success回调
  • Promise.cache()通过使用负持续时间来使用无限到期日期,这就是 receiveResult() 的原因方法releases upstream object by setting it's value to null
  • receiveResult()完成前的方法sets cached result using ref internal object并调用tryDrain()在退出方法之前。
  • tryDrain()方法sees previously cached result用于缓存 promise (p.map(v -> v + 2))的下一次调用,因此它将缓存结果直接传递给下游。

  • 对于在 for 循环中创建的所有 3 个 Promise,这种情况会重复。

    测试执行失败(竞争条件)

    使用这些 System.out.printf() 运行测试使测试失败的次数减少了几次,主要是因为此 I/O 操作消耗了一些 CPU 周期,并且不同步的部分代码有更多的周期来避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null...
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null})

    java.lang.NullPointerException
    at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
    at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
    at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    这是失败测试的输出 - 我在 IntelliJ IDEA 中运行它,并且我已将这个测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在运行这个测试几次之后,它最终在迭代号 1500 左右失败了。在这种情况下,我们可以看到在 for 循环中创建的第一个 Promise 发生了竞争条件。可以看到 receiveResult()里面释放上游对象后方法
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 

    并调用 tryDrain在退出该方法之前,缓存 promise 的下一次执行还没有看到先前缓存的结果,它运行到 yield(downstream)再次方法。之后 upstream对象已通过将其值设置为 null 来释放.和 yield(downstream)方法期望上游对象正确初始化,否则抛出 NPE。

    我试图调试方法:
    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    这是决定是否需要获取缓存的 promise 的方法。然而,当我添加任何日志语句时,它开始导致 StackOverflowError .我猜在极少数情况下 cached.expireAt.isBefore(clock.instant())返回 false , 因为 cached对象来自 AtomicReference所以这个对象应该在方法执行之间正确传递。

    这是我在实验中使用的完整测试类:
    import com.google.common.annotations.VisibleForTesting;
    import io.netty.util.internal.PlatformDependent;
    import org.junit.Test;
    import ratpack.exec.*;
    import ratpack.exec.internal.CompleteExecResult;
    import ratpack.exec.internal.DefaultExecution;
    import ratpack.exec.internal.DefaultPromise;
    import ratpack.exec.util.ParallelBatch;
    import ratpack.func.Function;
    import ratpack.func.Predicate;
    import ratpack.test.exec.ExecHarness;

    import java.time.Clock;
    import java.time.Duration;
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Queue;
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicReference;

    public class AnotherPromiseTest {

    @Test
    public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 3; i++) {
    Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
    continuation.resume(() -> down.success(12))
    )) {
    @Override
    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
    return transform(up -> {
    return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
    });
    }
    };

    p = p.cache();
    promises.add(p.map(v -> v + 1));
    promises.add(p.map(v -> v + 2));
    }

    ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
    }

    private static class TestCachingUpstream<T> implements Upstream<T> {
    private final String id = UUID.randomUUID().toString();

    private Upstream<? extends T> upstream;

    private final Clock clock;
    private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
    private final Function<? super ExecResult<T>, Duration> ttlFunc;

    private final AtomicBoolean pending = new AtomicBoolean();
    private final AtomicBoolean draining = new AtomicBoolean();
    private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();

    public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
    this(upstream, ttl, Clock.systemUTC());
    }

    @VisibleForTesting
    TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
    this.upstream = upstream;
    this.ttlFunc = ttl;
    this.clock = clock;
    }

    private void tryDrain() {
    if (draining.compareAndSet(false, true)) {
    try {
    TestCachingUpstream.Cached<? extends T> cached = ref.get();
    if (needsFetch(cached)) {
    if (pending.compareAndSet(false, true)) {
    Downstream<? super T> downstream = waiting.poll();

    System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);

    if (downstream == null) {
    pending.set(false);
    } else {
    try {
    yield(downstream);
    } catch (Throwable e) {
    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
    receiveResult(downstream, ExecResult.of(Result.error(e)));
    }
    }
    }
    } else {
    System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
    Downstream<? super T> downstream = waiting.poll();
    while (downstream != null) {
    downstream.accept(cached.result);
    downstream = waiting.poll();
    }
    }
    } finally {
    draining.set(false);
    }
    }

    if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
    tryDrain();
    }
    }

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
    return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }

    private void yield(final Downstream<? super T> downstream) throws Exception {
    System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
    upstream.connect(new Downstream<T>() {
    public void error(Throwable throwable) {
    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
    }

    @Override
    public void success(T value) {
    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
    receiveResult(downstream, ExecResult.of(Result.success(value)));
    }

    @Override
    public void complete() {
    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
    receiveResult(downstream, CompleteExecResult.get());
    }
    });
    }

    @Override
    public void connect(Downstream<? super T> downstream) throws Exception {
    TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
    if (needsFetch(cached)) {
    Promise.<T>async(d -> {
    waiting.add(d);
    tryDrain();
    }).result(downstream::accept);
    } else {
    downstream.accept(cached.result);
    }
    }

    private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
    Duration ttl = Duration.ofSeconds(0);
    try {
    ttl = ttlFunc.apply(result);
    } catch (Throwable e) {
    if (result.isError()) {
    //noinspection ThrowableResultOfMethodCallIgnored
    result.getThrowable().addSuppressed(e);
    } else {
    result = ExecResult.of(Result.error(e));
    }
    }

    Instant expiresAt;
    if (ttl.isNegative()) {
    expiresAt = null; // eternal
    System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
    upstream = null; // release
    } else if (ttl.isZero()) {
    expiresAt = clock.instant().minus(Duration.ofSeconds(1));
    } else {
    expiresAt = clock.instant().plus(ttl);
    }

    ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
    pending.set(false);

    downstream.accept(result);

    tryDrain();
    }

    static class Cached<T> {
    final ExecResult<T> result;
    final Instant expireAt;

    Cached(ExecResult<T> result, Instant expireAt) {
    this.result = result;
    this.expireAt = expireAt;
    }
    }
    }
    }

    希望能帮助到你。

    关于java - Ratpack 的 Promise.cache 与 ParallelBatch 中的多个下游 promise ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50824081/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com