gpt4 book ai didi

java - BehaviorSubject 订阅另一个线程

转载 作者:搜寻专家 更新时间:2023-11-01 03:46:04 25 4
gpt4 key购买 nike

我是 RxJava 的新手,我选择使用它是因为我认为它非常适合我的用例。

我有一些 Integer 值,我想在无限的时间段内观察。每当这些值之一发生变化(即一个事件)时,我希望在另一个线程上调用它的所有观察者。

由于观察时间长的要求,我认为我需要使用 BehaviorSubject 类(尽管最初我认为 Observable 是我所需要的全部......看到我只是需要“观察”),我可以使用 subscribeOn() 方法来设置调度程序,从而实现在后台线程上调用订阅者:

private BehaviorSubject<Integer> rotationPositionSubject = BehaviorSubject.createDefault(getRotorPosition());
rotationPositionSubject.subscribeOn(scheduler);

我有一个 rotate() 方法用于更新 rotationPositionSubject ,它将从主线程调用:

@Override
public synchronized int rotate()
{
final int newRotorPosition = super.rotate();
rotationPositionSubject.onNext(newRotorPosition);

return newRotorPosition;
}

但是,通过上面的代码,我发现订阅者是在“主”线程上调用的。检查 subscribeOn() 的文档:

Returns:

the source ObservableSource modified so that its subscriptions happen on the specified Scheduler

所以我上面的代码不会工作,因为我没有使用返回的 ObservableSource,但是返回对象是一个 Observable,这对我的应用程序没有用?

那么问题是,我如何使用 RxJava 在后台线程上长期观察任何对象并调用订阅者,或者 RxJava 是错误的选择吗?

最佳答案

经过一些实验后,似乎在使用 BehaviorSubject 对象时需要小心,而且它们的使用并不像我从各种接口(interface)的名称中推断的那样明显。

作为演示我当前正在做的测试方法:

@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
rotorBehaviour.subscribeOn(Schedulers.single());
rotorBehaviour.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});


rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);

}

这会导致不希望的结果:

Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 1
onNext() called on thread ID: 1

Process finished with exit code 0

(不需要,因为在主线程上调用了 onNext())

修改代码以使用从对 subscribeOn 的调用返回的 Observable 会产生相同的意外结果:

@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());

rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}

结果:

Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 1
onNext() called on thread ID: 1

Process finished with exit code 0

但是使用 observeOn() 方法确实给出了预期的结果:

@Test
public void test()
{
System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());

rotorObservable.subscribe(new Observer<Integer>()
{
@Override
public void onSubscribe(final Disposable d)
{
System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onNext(final Integer integer)
{
System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onError(final Throwable e)
{
System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
}

@Override
public void onComplete()
{
System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
}
});
rotorBehaviour.onNext(1);
rotorBehaviour.onNext(2);
}

Executing test on thread ID: 1
onSubscribe() called on thread ID: 1
onNext() called on thread ID: 13
onNext() called on thread ID: 13

Process finished with exit code 0

此外,在所有示例中,我仍在使用 BehaviorSubject 对象来启动事件,我只是偶然发现这会产生所需的结果。

让我担心的是,我可能以不正确的方式使用了 ObservableBehaviorSubject,只是“碰巧”给我正确的结果,即订阅者被调用后台线程。除非我在文档中的某处遗漏了它,否则如何使用这些对象获得所需结果似乎并不明显。

关于java - BehaviorSubject 订阅另一个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54060688/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com