gpt4 book ai didi

java - RxJava 与顶点 : can't have multiple subscriptions exception

转载 作者:行者123 更新时间:2023-11-30 02:55:31 24 4
gpt4 key购买 nike

我正在尝试使用 RxJava 避免 vertx 回调 hell 。但我有“rx.exceptions.OnErrorNotImplementedException:不能有多个订阅”。这里出了什么问题?

public class ShouldBeBetterSetter extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
Func1<AsyncMap<String,Long>, Observable<Void>> obtainAndPutValueToMap = stringLongAsyncMap -> {
Long value = System.currentTimeMillis();
return stringLongAsyncMap.putObservable("timestamp", value)
.doOnError(Throwable::printStackTrace)
.doOnNext(aVoid -> System.out.println("succesfully putted"));
};

Observable<AsyncMap<String,Long>> clusteredMapObservable =
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
.doOnError(Throwable::printStackTrace);

vertx.periodicStream(3000).toObservable()
.flatMap(l-> clusteredMapObservable.flatMap(obtainAndPutValueToMap))
.forEach(o -> {
System.out.println("just printing.");
});
}
}

工作 Verticle(无 Rx)可以在这里找到: https://gist.github.com/IvanZelenskyy/9d50de8980b7bdf1e959e19593f7ce4a

最佳答案

vertx.sharedData().getClusterWideMapObservable("mymap") 返回 observable,它仅支持单个订阅者 - 因此异常(exception)。一个值得一试的解决方案是:

Observable<AsyncMap<String,Long>> clusteredMapObservable =
Observable.defer(
() -> vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap")
);

这样每次调用 clusteredMapObservable.flatMap() 时,它都会订阅 Observable.defer() 返回的新 observable。

编辑

如果可以使用相同的 AsyncMap,如 @Ivan Zelenskyy 所指出的,解决方案可以是

Observable<AsyncMap<String,Long>> clusteredMapObservable =   
vertx.sharedData().<String,Long>getClusterWideMapObservable("mymap").cache()

关于java - RxJava 与顶点 : can't have multiple subscriptions exception,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37304880/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com