gpt4 book ai didi

java - 如何创建一个 Observable 来在完成后对 Observable 流进行 Margin 处理?

转载 作者:行者123 更新时间:2023-12-01 12:04:15 25 4
gpt4 key购买 nike

我正在使用 Jetty WebSocket 客户端从客户端设置 WebSocket 连接。我正在尝试创建一个类,以 Observable 的形式提供事件流。

我设法通过编写 POJO @WebSocket 来实现这一点将所有内容发布到 SerializedSubject<SocketEvent, SocketEvent> eventSubject = new SerializedSubject<>(PublishSubject.create()); 的类一切都运转良好。

如何让它在每次连接断开时重新连接?

我尝试从 Observable.interval 开始和flatMap - 发送至Observable<ObservableSocket.SocketEvent> connect(String url)每个连接返回 Observable。

Observable<Long> reconnectObservable = Observable.interval(1000, TimeUnit.MILLISECONDS);
Observable<ObservableSocket.SocketEvent> composed = reconnectObservable.flatMap(aLong -> {
try {
System.out.println("Connect");
return connect(url);
} catch (Exception e) {
System.out.println("Exception: " + e);
return Observable.just(new ObservableSocket.SocketCloseEvent(999, "Exception: " + e));
}
});

问题是,它每 1 秒创建另一个连接。如何让 flatMap 等待内部 Observable 完成?

最佳答案

Observable.range(0, Integer.MAX_VALUE).concatMap(tick -> { ... });

concatMap 维护一个 SerialSubscription,并且一次只会订阅一个发出的可观察对象,并等待每个可观察对象终止。该范围提供无限信号(在本例中,无穷大结束于大约 20 亿:P),并且 concatMap 将根据创建的每个内部可观察量一次进行一个连接。

关于java - 如何创建一个 Observable 来在完成后对 Observable 流进行 Margin 处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27755179/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com