gpt4 book ai didi

android - Rx Observable 定期发射值

转载 作者:IT老高 更新时间:2023-10-28 23:04:25 25 4
gpt4 key购买 nike

我必须定期轮询一些 RESTful 端点以刷新我的 android 应用程序的数据。我还必须根据连接暂停和恢复它(如果手机离线,甚至不需要尝试)。我当前的解决方案正在运行,但它使用标准 Java 的 ScheduledExecutorService执行周期性任务,但我想留在 Rx 范式中。

这是我当前的代码,为简洁起见,省略了部分代码。

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
// making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}

private void pause() {
for (ScheduledFuture<?> future : futures) {
future.cancel(true);
}
futures.clear();
}
});

final Subscription subscription = new Subscription() {
private boolean isUnsubscribed = false;

@Override
public void unsubscribe() {
scheduledExecutorService.shutdownNow();
isUnsubscribed = true;
}

@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
};
subscriber.add(subscription);
}
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservable基本上是一个广播接收器,包裹在 Observable<Boolean> 中,表示手机已联网。

正如我所说,这个解决方案有效,但我想使用 Rx 方法进行定期轮询并发出新的 UserProfile s,因为手动安排事情有很多问题,我想避免这些问题。我知道Observable.timerObservable.interval ,但不知道如何将它们应用于此任务(我不确定是否需要使用它们)。

最佳答案

在这个 GitHub 问题上有一些方法可能对您有所帮助。

https://github.com/ReactiveX/RxJava/issues/448

三个实现分别是:


Observable.interval

Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });

Scheduler.schedulePeriodically

Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });

Manual Recursion

Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> o) {
return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
@Override
public Subscription call(Scheduler inner, Long t2) {
o.onNext("data-from-polling");
return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("output: " + v);
}
});

结论是手动递归是可行的方法,因为它会等到操作完成后再安排下一次执行。

关于android - Rx Observable 定期发射值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24557153/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com