gpt4 book ai didi

java - 可观察到的热点

转载 作者:太空宇宙 更新时间:2023-11-04 10:39:48 24 4
gpt4 key购买 nike

我需要一些介于热和冷 Observable 之间的东西。它应该在客户端订阅时发出项目,并在客户端取消订阅时停止发出。但是,当客户端订阅同一个 Observable 时,应该交付剩余的项目。最后一件事是项目之间的时间。

Observable<String> hotishObservable = createHotishObservable("a", "b", "c");
Disposable sub = hotishObservable.subscribe();
// emit "a"
// 1 second passed
// emit "b"
sub.dispose()
Disposable sub = hotishObservable.subscribe();
// emit "c"

显而易见的解决方案是扩展 ObservableOnSubscribe 并处理 ObservableEmitter:

class HotishSub implements ObservableOnSubscribe<String> {

public HotishSub(String... items) {
this.items = items;
}

@Override
public void subscribe(ObservableEmitter<String> emitter) {
if(isNotEmpty())
emitter.onNext(nextItem);
executor.schedule(this::handleNext, 1000, TimeUnit.MILLISECONDS);
else
emitter.onComplete();
}

private void handleNext(){
//if emitter is not disposed and there're still items then emit it
}
}

Observable<String> createHotishObservable(String... items){
return Observable.create(new HotishSub(items));
}

还有更好的选择吗?

它是来自简化聊天机器人的消息流所必需的。 UI 客户端代码使用相同的 Observable 来获取来自机器人和真实用户的消息流。

最佳答案

我使用Flowable.generate来做到这一点。

  public static <T> Flowable<T> create(T... ts) {
List<T> list = new ArrayList<>(Arrays.asList(ts));
return Flowable.generate(() -> list, (l, e) -> {
if (l.isEmpty()) {
e.onComplete();
} else {
e.onNext(l.remove(0));
if (!l.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// you can use other way to delay it
}
}
}
});
}

然后测试一下

  public static void main(String[] args) throws Exception {
Flowable<String> ob = create("a", "b", "c", "d", "e");
Disposable d = ob.subscribeOn(Schedulers.computation())
.subscribe(i -> System.out.println(System.currentTimeMillis() + "\t" + i));
Thread.sleep(2500);
d.dispose();
ob.subscribeOn(Schedulers.computation())
.blockingSubscribe(i -> System.err.println(System.currentTimeMillis() + "\t" + i));
}

并输出:

1520304164412   a // sys.out
1520304165413 b // sys.out
1520304166413 c // sys.out
1520304166928 d // sys.err
1520304167927 e // sys.err

关于java - 可观察到的热点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49112023/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com