gpt4 book ai didi

rx-java - 在 RxJava 中公开 "expensive"Observables 的最佳实践

转载 作者:行者123 更新时间:2023-12-05 01:26:56 25 4
gpt4 key购买 nike

我是 RxJava 的新手,正在尝试确定常见的习语和最佳实践。

假设我有一个 Foo 类,它发出 Bar(目前不完整且过于简单):

class Foo {
public Subscriber barSubscriber;
public Observable<Bar> getBarObservable = (...details omitted...)

private void someMethod() {
// emit a Bar
barSubscriber.onNext(bar);
}
}

其他想要订阅那些Bars的对象通过调用

foo.getBarObservable().subscribe(...);

假设生成和发出 Bar 是“昂贵的”。为了避免在没有更多订阅者时执行此操作,Foo 的 getBarObservable 可以像这样公开一个可连接的、引用计数的 Observable(使用 share() ):

class Foo {
private Subscriber barSubscriber;
private Observable<Bar> barObservable = Observable.create(
new Observable.OnSubscribe<Bar>() {
@Override
public void call(Subscriber<? super Bar> subscriber) {
Foo.this.subscriber = subscriber;
new Thread(new Runnable() {
@Override
public void run() {
runUntilUnsubscribed();
}
}).start();

}
}
).share();

public Observable<Bar> getBarObservable() {
return barObservable;
}

public void runUntilUnsubscribed(} {
while(!subscriber.isUnsubscribed()) {

/* do some heavy stuff that produces a Bar. If, when a
Bar is ready, we still have subscribers, emit the Bar */

if (!subscriber.isUnsubscribed())
subscriber.onNext(bar);
}
}
}

我见过的大多数示例和教程都是在订阅它们的同一 block 代码中即时创建 Observable 的,所以我不清楚标准做法是什么在更真实的场景中,Observable 的创建和对它的订阅在两个不同的地方。

  1. 对于像 Foo 这样不想知道它的订阅者是谁或有多少订阅者的类,这是正确的方法吗?
  2. 在我看来,这将是一个非常典型的场景 - 是吗?或者,在更高层次上,这不是考虑公开 Observable 的正确方法吗?常规使用这种方法有什么缺点吗?
  3. 在我看来我需要那个小 if (subscriber == null && !subscriber.isUnsubscribed())
    subscriber.onNext(bar);
    每次我想发出 Bar 时的模式。这也是一个常见的习语,还是有更好的方法?
    没关系,我不需要空检查...,不确定我当时在想什么。

最佳答案

您的示例类无法正常工作:如果订阅者为 nullsetBar 可能会抛出 NPE,runUntilUnsubscribed 引用了一个缺失的 bar 字段/value 并且是一个繁忙的循环,会一遍又一遍地发出相同的值。

你说创建一个 Bar 很昂贵,但它的创建似乎在 Foo 类之外,我猜你想将这样的值分派(dispatch)给当前订阅的订户。这就是 PublishSubject 的用途:

class Foo {
final PublishSubject<Bar> subject = PublishSubject.create();
public void setBar(Bar bar) {
subject.onNext(bar);
}
public Observable<Bar> getBarObservable() {
return subject; // .asObservable() if you want to hide the subject
}
}

如果没有任何订阅者,则 bar set 将掉落并被垃圾收集。如果您想保留最后一个值,请使用 BehaviorSubject 而不是 PublishSubject

否则,如果您需要在订阅者到达时触发昂贵的 Bar 值的创建,您可以使用一些带有 share() 的启动序列:

Observable.just(1)
.subscribeOn(Schedulers.computation())
.map(v -> createBar())
.share();

但是 share() 的使用实际上取决于每个 Bar 值的预期生命周期。

例如,如果您想在订阅者到达之前存储柱状图,然后进行一次繁重的计算并发送结果,您可以采用以下结构:

class Foo {
final BehaviorSubject<Bar> subject = BehaviorSubject.create();
final Observable<Bar> output = subject
.observeOn(Schedulers.computation())
.doOnNext(bar -> expensiveInplaceComputation(bar))
.take(1)
.share();

public void setBar(Bar bar) {
subject.onNext(bar);
}
public Observable<Bar> getBarObservable() {
return output;
}
}

参见 this gist一个可运行的例子。

关于rx-java - 在 RxJava 中公开 "expensive"Observables 的最佳实践,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30397005/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com