gpt4 book ai didi

java - RxJava : observable that contains an asynchronous call

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:17:27 27 4
gpt4 key购买 nike

我试图理解 RxJava 并遇到以下情况。

考虑以下返回调用 NsdManager.registerService 的可观察对象的方法。 registerService 方法需要一个监听器,当注册成功(或失败)时调用。

public Observable<Boolean> registerService() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {

@Override
public void call(Subscriber<? super Boolean> subscriber) {
nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);

// how to proceed?
}
});
}

观察者只有在监听器被调用后才能提供通知,但监听器是异步调用的。

我如何使用 RxJava 做到这一点?


我使用 BehaviorSubject 想出了以下内容。不知道这是否是最佳解决方案,但它确实有效。

private BehaviorSubject<Boolean> registrationSubject;

public Observable<Boolean> registerService() {
registrationSubject = BehaviorSubject.create();

Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(Subscriber<? super Boolean> subscriber) {
NsdServiceInfo serviceInfo = new NsdServiceInfo();
serviceInfo.setServiceName(serviceName);
serviceInfo.setServiceType(NSD_SERVICE_TYPE);
serviceInfo.setPort(serverSocket.getLocalPort());

nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD, registrationListener);
}
}).subscribe(registrationSubject);

return registrationSubject;
}

private NsdManager.RegistrationListener registrationListener = new NsdManager.RegistrationListener() {
@Override
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
registrationSubject.onNext(false);
registrationSubject.onCompleted();
}

@Override
public void onServiceRegistered(NsdServiceInfo serviceInfo) {
registrationSubject.onNext(true);
registrationSubject.onCompleted();
}

@Override
public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

@Override
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
};

最佳答案

我认为最好尽可能避免使用 Subjects。在您的解决方案中,您仅使用主题来调用 onNextonCompleted。但是,在 Observable.create() 方法中,您已经可以访问可以调用这些方法的 subscriber。换句话说,您可以将事件处理程序的完整设置包装在 Observable.create() 方法中。

public Observable<Boolean> registerService() {
return Observable.create(new Observable.OnSubscribe<Boolean>() {
@Override
public void call(final Subscriber<? super Boolean> subscriber) {
NsdServiceInfo serviceInfo = new NsdServiceInfo();
serviceInfo.setServiceName(serviceName);
serviceInfo.setServiceType(NSD_SERVICE_TYPE);
serviceInfo.setPort(serverSocket.getLocalPort());

nsdManager.registerService(serviceInfo, NsdManager.PROTOCOL_DNS_SD,
new NsdManager.RegistrationListener() {
@Override
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(false);
subscriber.onCompleted();
}
}

@Override
public void onServiceRegistered(NsdServiceInfo serviceInfo) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(true);
subscriber.onCompleted();
}
}

@Override
public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) { }

@Override
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {}
}
);
}
});
}

关于java - RxJava : observable that contains an asynchronous call,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25259901/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com