gpt4 book ai didi

unit-testing - 如何测试 observable 在 RxJava 中使用正确的调度程序?

转载 作者:行者123 更新时间:2023-12-04 12:05:47 37 4
gpt4 key购买 nike

我有以下类(class)(简化):

public class Usecase<T> {
private final Observable<T> get;
private final Scheduler observeScheduler;

public Usecase(Observable<T> get, Scheduler observeScheduler) {
this.get = get;
this.observeScheduler = observeScheduler;
}

public Observable<T> execute() {
return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler);
}
}

我正在为它编写单元测试。我如何测试 subscribeOnobserveOn用正确的值调用了吗?

我尝试以下操作:
    Observable<String> observable = mock(Observable.class);
Usecase<String> usecase = new Usecase(observable, Schedulers.computation());
usecase.execute();

verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes
verify(observable).observeOn(Schedulers.computation()); // should pass, but fails: Missing method call for verify(mock) here

以上失败(我认为)因为 subscribeOnobserveOnfinal方法。那么可能有其他方法可以确保可观察对象使用正确的调度程序吗?

最佳答案

There is a way间接访问 observable 正在运行和被观察的两个线程,这意味着您实际上可以验证 Observable使用正确的调度程序。

我们仅限于按名称验证线程。幸运的是,Schedulers.io() 使用的线程以我们可以匹配的一致前缀命名。这是具有唯一前缀的调度程序的(完整?)列表以供引用:

  • Schedulers.io() - RxCachedThreadScheduler
  • Schedulers.newThread() - RxNewThreadScheduler
  • Schedulers.computation() - RxComputationThreadPool

  • 验证 Observable 是 订阅 IO 线程 :

    // Calling Thread.currentThread() inside Observer.OnSubscribe will return the
    // thread the Observable is running on (Schedulers.io() in our case)
    Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
    s.onNext(Thread.currentThread().getName());
    s.onCompleted();
    })

    // Schedule the Observable
    Usecase usecase = new Usecase(obs, Schedulers.immediate());
    Observable usecaseObservable = usecase.execute();

    // Verify the Observable emitted the name of the IO thread
    String subscribingThread = usecaseObservable.toBlocking().first();
    assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");

    验证 Observable 是 观察到 计算线程 ,您可以使用 TestSubscriber#getLastSeenThread访问用于观察的最后一个线程。

    TestSubscriber<Object> subscriber = TestSubscriber.create();
    UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
    usecase.execute().subscribe(subscriber);

    // The observable runs asynchronously, so wait for it to complete
    subscriber.awaitTerminalEvent();
    subscriber.assertNoErrors();

    // Verify the observable was observed on the computation thread
    String observingThread = subscriber.getLastSeenThread().getName();
    assertThat(observingThread).startsWith("RxComputationThreadPool");

    尽管我使用的是 AssertJ,但不需要第三方库或模拟。流利的 startsWith断言。

    关于unit-testing - 如何测试 observable 在 RxJava 中使用正确的调度程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36501358/

    37 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com