gpt4 book ai didi

rx-java - 如何在RxJava中将Observable子类化?

转载 作者:行者123 更新时间:2023-12-03 14:06:12 25 4
gpt4 key购买 nike

我知道您应该不惜一切代价避免这种情况,但是如果我在RxJava中有一个有效的用例用于Observable子类,该怎么办?可能吗?我该怎么办?

在这种特定情况下,我有一个“存储库”类,该类当前返回Requests:

class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();

[...]

public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
Request<T> self = this;
return new Request<T>() {
@Override public Object key() { return self.key; }
@Override public Observable<T> asObservable() { return transformation.call(self); }
}
}
}


然后,在需要请求键(如缓存)的上下文中,我使用transform方法修改可观察到的响应(asObservable):

 service.getItemList() // <- returns a Request<List<Item>>
.transform(r -> r.asObservable()
// The activity is the current Activity in Android
.compose(Operators.ensureThereIsAnAccount(activity))
// The cache comes last because we don't need auth for cached responses
.compose(cache.cacheTransformation(r.key())))
.asObservable()
[... your common RxJava code ...]


现在,如果我的Request类是一个Observable子类,这将非常方便,因为这样我就可以消除所有的.asObservable()调用,并且客户端甚至不需要了解我的Request类。

最佳答案

可以将Observable子类化(我们对SubjectConnectableObservable执行此操作),但是需要额外考虑,因为您需要传递OnSubscribe回调以处理传入的Subscriber。对于我来说,尚不清楚您的请求在有人订阅时应该怎么做,因此,我给您提供两个扩展Observable的示例:

可观察,没有共享的可变状态

如果没有在订阅者之间共享可变状态,则可以扩展Observable并将操作传递给super

public final class MyObservable extends Observable<Long> {
public MyObservable() {
super(new OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> child) {
child.onNext(System.currentTimeMillis());
child.onCompleted();
}
});
}
}


共享可变状态下可观察

这通常比较棘手,因为您需要同时通过 OnSubscribe方法和Observable方法访问共享状态,但是Java不允许您在 OnSubscribe完成之前接触 super内部类的实例字段。解决方案是从构造函数中排除这种共享状态和 OnSubscribe,并使用静态工厂方法来设置两者:

public final class MySharedObservable extends Observable<Long> {
public static MySharedObservable create() {
final AtomicLong counter = new AtomicLong();
OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> t1) {
t1.onNext(counter.incrementAndGet());
t1.onCompleted();
}
};
return new MySharedObservable(onSubscribe, counter);
}
private AtomicLong counter;

private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
super(onSubscribe);
this.counter = counter;
}
public long getCounter() {
return counter.get();
}
}

关于rx-java - 如何在RxJava中将Observable子类化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28922375/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com