gpt4 book ai didi

java - 是否有一个可观察对象只是传播错误而不终止自身?

转载 作者:行者123 更新时间:2023-12-01 11:51:07 24 4
gpt4 key购买 nike

我正在使用PublishSubject在负责同步的类中。同步完成后,所有订阅者都会收到通知。发生错误时也会发生同样的情况。我注意到,下次发生错误后订阅时,它会立即返回给订阅者。

所以这个类可能看起来像这样:

public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
10, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(true),
new ThreadPoolExecutor.DiscardPolicy());


public Observable<Result> syncHead(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...

mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});

是否有一个可观察对象可以充当代理?可能是其他 Rx 方法?

更新:我遵循 @akarnokd 方法并发出包装在 RxJava Notification 中的事件。然后通过 flatMap() 解开它们。因此 Synchronizer 类的客户端不需要这样做。

    //...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();

public Observable<Result> syncHead(final int chunkSize) {

return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
@Override
public Observable<Result> call(Notification<Result> result) {
if (result.isOnError()) {
return Observable.error(result.getThrowable());
}

return Observable.just(result.getValue());
}
}).doOnSubscribe(
new Action0() {
@Override
public void call() {
startHeadSync(chunkSize);
}
});
}

private void startHeadSync(final int chunkSize) {
mExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//Do some work which either returns a result or throws an error
//...

mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
} catch (Throwable error) {
mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
}
}
});
}
//...

最佳答案

我不确定您希望通过此设置实现什么目的,但通常,为了避免 PublishSubject 出现终止条件,您应该将您的值和错误包装到一个通用结构中,并且始终发出这些,绝不发出任何 onErroronCompleted。一种选择是使用 RxJava 自己的事件包装器 Notification,并且您的订阅者 应该解开该值。

关于java - 是否有一个可观察对象只是传播错误而不终止自身?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28815966/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com