gpt4 book ai didi

android - 一种在 RxJava 中定义订阅者顺序的方法?

转载 作者:行者123 更新时间:2023-11-29 01:42:38 25 4
gpt4 key购买 nike

我正在寻找一种方法来定义观察者的顺序(?)。

@GET("/get_user_msgs")
Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params);

例如,我从 Retrofit 创建的 Rest API 中提供了一个 Observable。

在我的 ListView 中,我正在观察这个 Observable

api.getPrivateMessages(params).subscribe(new Observer());

我也有一个用于 Espresso 测试的 API 包装器,我在那里订阅了相同的 Observable。这样,首先调用 API 包装器中的观察者,然后才调用 ListView 中的观察者被称为。

public class IdlingWrapper implements Api, IdlingResource { 
....

public IdlingWrapper(Api realApi) {
this.realApi = realApi;
}

...

public Observable<PrivateMessagesResponse> getPrivateMessages(@QueryMap Map<String, String> params); {
counter.incrementAndGet();
return wrapObservable(realApi.getPrivateMessages(params));
}

protected <T> Observable<T> wrapObservable(final Observable<PrivateMessagesResponse> observable) {
//what to do here?
}
}

有没有办法强制某些观察者在所有其他观察者完成后得到通知?或者类似的东西?

有点像

Observable observable = getObservable();
observable.subscribeAsLast(new LastObserver());
observable.subscribe(new ObserverA());
observable.subscribe(new ObserverB());

因此 ObserverA 将首先收到通知,然后是 ObserverB,然后才通知 LastObserver

或者我可以找出所有注册观察员何时收到通知和完成的任何其他方法。

最佳答案

我不太确定您在 IdlingWrapper 中尝试做什么,但我认为当前的实现非常脆弱。

我认为需要发生的最重要的事情是保证 observable 只能被调用一次。

这里有一个快速实现来演示这一点以及我对 wrapObservable 的实现。

public class Test {

private static int counter = 0;

private static final List<Observable<?>> list = Collections.synchronizedList(new ArrayList<>());

protected static <T> Observable<T> wrapObservable(final Observable<T> original) {
// run atleast once???
synchronized (list) {
list.add(original);
}

return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
synchronized (list) {
counter++;
if (!list.contains(original)) {
subscriber.onError(new Exception("You can only subscribe once!"));
return;
}
list.remove(original);
}

// Sleep to make it easier to see things happening...
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}

subscriber.onCompleted();
}
}).flatMap(new Func1<Void, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(Void o) {
return original;
}
}).finallyDo(new Action0() {
@Override
public void call() {
synchronized (list) {
counter--;
if (list.size() == 0 && counter == 0) {
System.err.println("finally");
}
}
}
});
}

public static void main(String[] args) throws InterruptedException {
for(int i = 0; i < 10; i++) {
// running in io thread for simulating async call.
Observable<String> test = wrapObservable(Observable.from("TEST!!!!!!")).subscribeOn(Schedulers.io());
test.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.err.println("completed");
}

@Override
public void onError(Throwable e) {
System.err.println("error");
}

@Override
public void onNext(String s) {
System.err.println("next");
}
});

// example of calling the same observable twice.
test.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.err.println("completed");
}

@Override
public void onError(Throwable e) {
System.err.println("error");
}

@Override
public void onNext(String s) {
System.err.println("next");
}
});
}

Thread.sleep(10000);
}
}

关于android - 一种在 RxJava 中定义订阅者顺序的方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23521870/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com