gpt4 book ai didi

android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?

转载 作者:行者123 更新时间:2023-12-04 11:35:15 25 4
gpt4 key购买 nike

我有一个 ViewModel正在观察 RxJava Observable在我的MainRepo类(class)。我正在尝试获取我的 WebsocketListenerMainRepo类来发出事件,但我不确定该怎么做。
MainRepo 类:

private WebSocket ws;

public void createWsConnection() {
Request request = new Request.Builder()
.url(Constants.WEBSOCKET_ENDPOINT)
.addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
.build();

OkHttpClient client = new OkHttpClient
.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build();

this.ws = client.newWebSocket(request, webSocketListener);
}
这就是我感到困惑的地方。我不知道如何将 websocket 与 RxJava observable 一起使用。
public Observable<String> createListener(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
//I don't know what to put here in order to emit messages
//back to my ViewModel class using the websocket listener
}
});
}
websocket监听器:
 private WebSocketListener webSocketListener = new WebSocketListener() {

@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}

@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}

@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");
}

@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");

}

@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());

}
};
ViewModel 类中的一个函数正在观察我的 MainRepo 类中的 Observable:
public void connectToWs(){
mainRepo.createListener()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Timber.d("Subscribed");
}

@Override
public void onNext(@NonNull String s) {
Timber.d("Message: " + s);
}

@Override
public void onError(@NonNull Throwable e) {
Timber.e(e, "Something went wrong.");
}

@Override
public void onComplete() {
Timber.d("On complete.");
}
});
}

最佳答案

创建 PublishSubject并更改您的createListener返回方法:

private PublishSubject<String> publishSubject = PublishSubject.create<String>();

public Observable<String> createListener(){
return publishSubject;
}
PublishSubject 是一个 Observable,因此请注意您不需要更改方法签名,但我建议您将方法名称重命名为 observeMessages .
然后在您的 websocket 监听器中,您可以使用 onNext 将消息发送到 PublishSubject方法。您也应该调用 onComplete在 onClosed 方法和 onError在 onFailure 方法中:
 private WebSocketListener webSocketListener = new WebSocketListener() {

@Override
public void onOpen(@NotNull WebSocket webSocket, Response response) {
Timber.d("Ws connection opened...", response.toString());
}

@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closing...");
}

@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
Timber.d("Ws connection closed...");

publishSubject.onComplete();
}

@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
Timber.d("Ws incoming message.");

publishSubject.onNext(text);
}

@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
Timber.e(t, "Ws connection failure.", response.toString());

publishSubject.onError(t);
}
};

关于android - RxJava + Websocket - 如何将 Observable 添加到 Websocket 监听器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67782867/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com