gpt4 book ai didi

java - 为 Observable 提供值(value)

转载 作者:行者123 更新时间:2023-12-01 12:29:15 25 4
gpt4 key购买 nike

我是 rxjava 新手,遇到以下问题:

外部系统会不定期地将对象放入 FIFO 队列中。我需要一个每秒运行一次的 Observable,从队列中获取一个项目(如果有的话)并将其发送给订阅者。

两个问题:

  • 队列项是在 Observable 处于 Activity 状态时生成的,不可能预先提供所有项。队列可能会空,在这种情况下,Observable 必须等待并且不发出任何东西。 (如果 Observable 在暂停后队列中的某个项目变得可用时立即启动,那就太好了,但是如果我们不想更频繁地轮询,那么队列可能也需要是一个 Observable,不想法如何。)

  • 外部系统必须能够完成 Observable。我可以设置一个变量并从 Observable 中读取它,但我想知道是否有更优雅的方法来做到这一点。

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue
    boolean stopObservable = false; // the variable to stop the observable

    Observable.create(new Observable.OnSubscribe<Layer>() {

    @Override public void call(Subscriber<? super Layer> subscriber) {
    try {
    if (!queue.isEmpty()) {
    Layer layer = queue.poll();
    subscriber.onNext(layer);
    } else {
    if (stopObservable) { subscriber.onCompleted(); }
    }
    } catch (Exception e) {
    subscriber.onError(e);
    }
    }

    }).somethingThatCreatesTheInterval().subscribeOnEtc.

对于间隔,我不能使用 .sample(),因为它会丢弃项目,并且发出所有项目很重要。

.throttleWithTimeout() 看起来更好,但它似乎也会丢弃项目。

rx 非常酷,但很难入门。任何意见表示赞赏。

最佳答案

当我需要定期轮询外部 Web 服务时,我做了类似的事情。

  1. 对于时间间隔,您可以继续使用 timer ;在每个以 1 秒为单位的刻度上,可观察链将轮询并可能选择一层,如果该层为空,则不会发出任何内容

    Observable.timer(0, 1, TimeUnit.SECOND)
    .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null))
    .subscribe(layer -> System.out.format("The layer is : %s", layer));
  2. 现在,如果您想中止整个链,您可以添加 takeUntil 。因此,当您的外部系统想要停止时,它将在 stopObservable 中提交一些内容,这将停止后续订阅:

    // somewhere before
    PublishSubject stopNotifier = PublishSubject.create();

    // somewhere process the queue
    Observable.timer(0, 1, TimeUnit.SECOND)
    .takeUntil(stopNotifier)
    .flatMap(tick -> Observable.just(queue.poll()))
    .subscribe(layer -> System.out.format("The layer is : %s", layer));

    // when not anymore interested (calling onComplete works too)
    stopNotifier.onNext("cancel everything about the queue");

我是通过平板电脑写此回复,因此您可能会认为我可能拼错了一些单词或犯了幼稚的编程错误;)

关于java - 为 Observable 提供值(value),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26080646/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com