gpt4 book ai didi

rx-java - 如何在不丢失发出的项目的情况下暂停 Observable?

转载 作者:行者123 更新时间:2023-12-04 23:41:41 32 4
gpt4 key购买 nike

我有一个 Observable每秒发出滴答声:

Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));

我想暂停这个 Observable 以便它停止发出数字,并根据需要恢复它。

有一些问题:
  • 根据 Observable Javadoc,interval运算符不支持背压
  • RxJava wiki 关于 backpressure有一节关于调用堆栈阻塞作为背压的流量控制替代方案:

  • Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.



    有没有办法暂停 interval可观察?或者我应该使用一些背压支持来实现我自己的“滴答” Observable 吗?

    最佳答案

    有很多方法可以做到这一点。例如,您仍然可以使用 interval()并保持两个额外的状态:说 bool 标志“暂停”和一个计数器。

    public static final Observable<Long> pausableInterval(
    final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

    final AtomicLong counter = new AtomicLong();
    return Observable.interval(initial, interval, unit, scheduler)
    .filter(tick -> !paused.get())
    .map(tick -> counter.getAndIncrement());
    }

    然后你只需在某处调用 paused.set(true/false) 来暂停/恢复

    编辑 2016-06-04

    上面的解决方案有一个小问题。
    如果我们多次重用 observable 实例,它将从上次取消订阅时的值开始。例如:
    Observable<Long> o = pausableInterval(...)
    List<Long> list1 = o.take(5).toList().toBlocking().single();
    List<Long> list2 = o.take(5).toList().toBlocking().single();

    虽然 list1 预期为 [0,1,2,3,4],list2 实际上为 [5,6,7,8,9]。
    如果不希望出现上述行为,则必须将 observable 设为无状态。这可以通过 scan() 运算符来实现。
    修订后的版本可能如下所示:
      public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
    final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
    .filter(tick->!pause.get())
    .scan((acc,tick)->acc + 1);
    }

    或者,如果您不希望依赖 Java 8 和 lambdas,您可以使用 Java 6+ 兼容代码执行以下操作:

    https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

    关于rx-java - 如何在不丢失发出的项目的情况下暂停 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35782767/

    32 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com