gpt4 book ai didi

android - RXJava - 制作一个可暂停的可观察对象(例如使用缓冲区和窗口)

转载 作者:IT老高 更新时间:2023-10-28 23:35:36 28 4
gpt4 key购买 nike

我想创建执行以下操作的可观察对象:

  • 在暂停时缓冲所有项目
  • 立即发出项目,而不是暂停
  • 暂停/恢复触发器必须来自另一个可观察对象
  • 它必须保存以供不在主线程上运行的可观察对象使用,并且必须保存从主线程更改暂停/恢复状态

我想使用 BehaviorSubject<Boolean>作为触发器并将此触发器绑定(bind)到 Activity 的 onResumeonPause事件。 (附上代码示例)

问题

我已经设置了一些东西,但它没有按预期工作。我使用它如下:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();

目前的问题是,变体 1 应该可以正常工作,但有时,事件只是没有发出 - 阀门没有发出,直到阀门一切正常(可能是线程问题......)!解决方案 2 更简单并且似乎有效,但我不确定它是否真的更好,我不这么认为。我实际上不确定,为什么解决方案一有时会失败,所以我不确定解决方案 2 是否解决了(目前对我来说未知)问题......

谁能告诉我可能是什么问题,或者简单的解决方案是否应该可靠地工作?或者告诉我一个可靠的解决方案?

代码

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser 函数

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Activity

public class BaseActivity extends AppCompatActivity {

private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}

public PublishSubject<Boolean> getPauser()
{
return pauser;
}

@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}

@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}

最佳答案

您实际上可以使用 .buffer() 运算符将其传递给 observable,定义何时停止缓冲,示例:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);

来自第 5 章,“驯服序列”:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

您可以使用 PublishSubject 作为 Observable 在自定义运算符中为其提供元素。每次需要开始缓冲时,通过 Observable.defer(() -> createBufferingValve())

创建实例

关于android - RXJava - 制作一个可暂停的可观察对象(例如使用缓冲区和窗口),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39685467/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com