gpt4 book ai didi

java - 即使在异常后使用接收通知继续订阅数据

转载 作者:行者123 更新时间:2023-12-02 02:18:09 24 4
gpt4 key购买 nike

在下面的代码中,只要出现超时异常,订阅者就会停止接收数据。如何确保订阅者在出现异常时不会停止。

public class ReactiveDataService
{
private static String[] quotes = {"ITEM1", "ITEM2", "ITEM3"};
public Observable<Notification<String>> getStreamData()
{

return Observable.create(subscriber -> {
if(!subscriber.isUnsubscribed())
{

Stream<String> streams = Arrays.stream(quotes);
streams.map(quote -> quote.toString()).filter(quote -> quote!=null)
.forEach(q -> {
subscriber.onNext(Notification.createOnNext(q));
try
{
Random rand = new Random();
Integer i = (rand.nextInt(5)+1)*1000;
Thread.sleep(i);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
}
subscriber.onCompleted();
});

}
}


public class ReactiveResource
{
public static void main(String args[])
{
Observable<Notification<String>> watcher = new ReactiveResource().getData()
.timeout(4, TimeUnit.SECONDS)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.onErrorResumeNext(th -> {
return Observable.just(Notification.createOnError(new TimeoutException("Timed Out!")));
});

watcher.subscribe(
ReactiveResource::callBack,
ReactiveResource::errorCallBack,
ReactiveResource::completeCallBack
);
}

public static Action1 callBack(Notification<String> data)
{
System.out.println(data.getValue());
return null;
}

public static void errorCallBack(Throwable throwable)
{
System.out.println(throwable instanceof TimeoutException);
System.out.println(throwable);
}

public static void completeCallBack()
{
System.out.println("On completed successfully");
}


private Observable<Notification<String>> getData()
{
return new ReactiveDataService().getStreamData();
}

最佳答案

您可以组合 publishmergeWithtimer 来实现此效果:

static <T> ObservableTransformer<T, T> onTimeoutKeepAlive(
long timeout, TimeUnit unit, Scheduler scheduler, T keepAliveItem) {
return upstream ->
upstream.publish(o ->
o.mergeWith(
Observable.timer(timeout, unit, scheduler)
.map(t -> keepAliveItem)
.takeUntil(o)
.repeat()
.takeUntil(o.ignoreElements().toObservable())
)
);
}

用法:

source
.compose(onTimeoutKeepAlive(
10, TimeUnit.SECONDS, Schedulers.computation(),
Notification.createOnError(new TimeoutException())
))
.subscribe(/* ... */);

关于java - 即使在异常后使用接收通知继续订阅数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48963669/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com