- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我是 RxJava 的新手,我选择使用它是因为我认为它非常适合我的用例。
我有一些 Integer
值,我想在无限的时间段内观察。每当这些值之一发生变化(即一个事件)时,我希望在另一个线程上调用它的所有观察者。
由于观察时间长的要求,我认为我需要使用 BehaviorSubject
类(尽管最初我认为 Observable
是我所需要的全部......看到我只是需要“观察”),我可以使用 subscribeOn()
方法来设置调度程序,从而实现在后台线程上调用订阅者:
private BehaviorSubject<Integer> rotationPositionSubject = BehaviorSubject.createDefault(getRotorPosition());
rotationPositionSubject.subscribeOn(scheduler);
我有一个 rotate()
方法用于更新 rotationPositionSubject
,它将从主线程调用:
@Override
public synchronized int rotate()
{
final int newRotorPosition = super.rotate();
rotationPositionSubject.onNext(newRotorPosition);
return newRotorPosition;
}
但是,通过上面的代码,我发现订阅者是在“主”线程上调用的。检查 subscribeOn() 的文档:
Returns:
the source ObservableSource modified so that its subscriptions happen on the specified Scheduler
所以我上面的代码不会工作,因为我没有使用返回的 ObservableSource,但是返回对象是一个 Observable
,这对我的应用程序没有用?
那么问题是,我如何使用 RxJava 在后台线程上长期观察任何对象并调用订阅者,或者 RxJava 是错误的选择吗?
最佳答案
经过一些实验后,似乎在使用 BehaviorSubject
对象时需要小心,而且它们的使用并不像我从各种接口(interface)的名称中推断的那样明显。
作为演示我当前正在做的测试方法:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
rotorBehaviour.subscribeOn(Schedulers.single());
rotorBehaviour.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
这会导致不希望的结果:
Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 1
onNext() called on thread ID: 1Process finished with exit code 0
(不需要,因为在主线程上调用了 onNext()
)
修改代码以使用从对 subscribeOn
的调用返回的 Observable
会产生相同的意外结果:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());
rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
结果:
Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 1
onNext() called on thread ID: 1Process finished with exit code 0
但是使用 observeOn()
方法确实给出了预期的结果:
@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());
rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}
@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}
Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 13
onNext() called on thread ID: 13Process finished with exit code 0
此外,在所有示例中,我仍在使用 BehaviorSubject
对象来启动事件,我只是偶然发现这会产生所需的结果。
让我担心的是,我可能以不正确的方式使用了 Observable
和 BehaviorSubject
,只是“碰巧”给我正确的结果,即订阅者被调用后台线程。除非我在文档中的某处遗漏了它,否则如何使用这些对象获得所需结果似乎并不明显。
关于java - BehaviorSubject 订阅另一个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54060688/
我有一个 rxjs@6 BehaviorSubject source$ ,我想从 source$ 获取子值 const source$ = new BehaviorSubject(someValu
场景我有一个 Territories 列表,我想从服务器获取每个 Territory 中的所有 Items。因此,我决定使用 BehaviorSubject 来尝试一下。我不知道我是不是“疯了”。这就
至少在移动应用程序中,BehaviorSubject 被频繁使用,为属性建模——它有一个当前值,可以随时查询并观察。 有时它只想转换 BehaviorSubject 而无需订阅它。例如。如果有类,它充
我在使用 Angular 服务时遇到了一个由 BehaviorSubject 引起的错误。 错误 Module '"ng5/node_modules/rxjs/BehaviorSubject"' ha
我正在使用 Angular 2 和 RxJS 5。 这两者有什么区别吗: 应该首先使用哪个?谢谢 isOpen$ = new BehaviorSubject(true); 和 isOpen$ = Be
我正在查看可观察对象的接口(interface),发现您可以将实现 PartialObserver 的任何内容传递给订阅函数。所以我用 BehaviorSubject 做到了。 像这样(一) sour
我的其中一个组件中有这个: public booleanSubject: BehaviorSubject = new BehaviorSubject(false); 当我添加 "strictFunct
您好,我有一个具有简单类型 int 的 BehaviorSubject,我将值 5 添加到它,然后添加另一个值 5。流监听器向我发送了两个事件。如果值等于最后一个值,如何强制检查值并且不发送事件。示例
在我们的单页应用程序中,我们开发了一个集中式存储类,该类使用 RxJS 行为主体来处理应用程序的状态及其所有变化。我们应用程序中的几个组件正在订阅我们商店的行为主题,以便接收对当前应用程序状态的任何更
我有一个 Angular 服务,它有一个名为 must_checkout 的 bool 属性。我的服务还包含一个名为 observable_must_checkout 的属性,它是一个调查 must_
我有一个 BehaviorSubject我希望能够filter ,但要保持新订阅者在订阅时始终获得一个值的行为主题式质量,即使最后发出的值被过滤掉。有没有一种简洁的方法可以使用 rxjs 的内置函数来
我有一个 BehaviorSubject我想重置 - 我的意思是我希望最新的值不可用,就像它刚刚创建一样。 我似乎没有看到可以执行此操作的 API,但我想还有另一种方法可以实现相同的结果吗? 我想要的
我使用 RxJS 中的 BehaviourSubject: private rights = new BehaviorSubject>([]); updateRights(rights: Array)
我几乎已经完成了 android,但对 java 8 还是陌生的。我制作了一个 BehaviourSubject>。 我在上面放置了一个可观察对象。在 Presenter 类中,我将 Behaviou
在我的 Angular 应用程序中 我多次尝试使用 BehaviorSubject 获取值,以了解值何时更改或接收。我无法在加载组件之前获取值。 通过此链接,您可以查看返回的内容: https://d
我有一个函数应该返回一个BehaviorSubject。该主题旨在返回 Profile 的最新版本 (user)Profile 只是一个包含对三个成员的引用的 POJO: - 用户, - 该用户的 M
通缉行为: subject = BehaviorSubject.create(1); subject.subscribe(number -> print(number), error -> print
我是 RxJava 的新手,我选择使用它是因为我认为它非常适合我的用例。 我有一些 Integer 值,我想在无限的时间段内观察。每当这些值之一发生变化(即一个事件)时,我希望在另一个线程上调用它的所
我有一个 EvaluateHistoryItem[] 类型的 BehaviourSubject history$。在 uploadFile 方法中,我用逗号分割上传的 .txt 文件,然后将它们推送到
我是 Angular 的新手,不知道如何使用异步调用来更新响应式表单。 我有一个基于对象模型的 react 形式。表单中的任何更改都会触发 HTTP 请求,该请求可能会发回对象的更新版本作为响应。我需
我是一名优秀的程序员,十分优秀!