gpt4 book ai didi

java - 在 ConnectableObservable 中执行异步查询的线程问题

转载 作者:行者123 更新时间:2023-11-29 23:21:07 25 4
gpt4 key购买 nike

所以我有一个接收字符串的 ConnectableObservable,我需要为每个字符串执行查询(异步)并等待结果返回

目前我正在使用闩锁等待查询返回,但可观察到它卡住了或者只是完成了第一个字符串而没有继续其他字符串。

我已经隔离了这个例子中的问题

private Subscriber<? super String> stringStreamInput;

ConnectableObservable<String> stringStream = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
stringStreamInput = subscriber;
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String perro) {
return getPerroDetails(perro);
}
})
.publish();
stringStream.connect();

调用模拟查询的方法

  private String getPerroDetails(final String perro) {
Log.d("Hey", "Perro " + perro);

//Simulating the query to get Perro details
String newPerro = executeQueryForPerro(perro);

Log.d("Hey", "NewPerro " + newPerro);
return newPerro;
}

使用 CountDownLatch 模拟查询

private String executeQueryForPerro(final String perro) {
final String[] newPerro = new String[1];
final CountDownLatch latch = new CountDownLatch(1);

//SIMULATION OF THE QUERY
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
@Override
public void run() {
newPerro[0] = perro + " reloaded ";
latch.countdown();
}
}, 1000);

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return newPerro[0];
}

测试

 String[] perros = new String[]{"Beagle", "Alaska", "Chihuahua", "PitBull", "RedBull"};
for (String perro : perros) {
stringStreamInput.onNext(perro);
}

我可以采取哪些其他方法来确保 Query 正确执行并等待结果返回以转到下一步?

有什么神奇的 Rx 解决方案吗?

非常感谢

最佳答案

当您将 RxJava 世界与显式线程管理混合使用时,您会经常遇到问题,通常是在测试时。 RxJava 工具包非常灵活,几乎可以提供您需要的一切。

您的数据源需要类似于 Subject。在您的代码中,您直接向订阅者发送值,完全绕过您的观察者链,这可能不是您想要的。

PublishSubject<String> stringStreamInput = PublishSubject.create();

然后,更改 getPerroDetails() 以返回最终将提供结果的 Observable:

Observable<String> getPerroDetails(final String perro);

然后您可以使用flatMap() 运算符来获取详细信息:

Observable<String> stringStream = stringStreamInput
.observeOn(Schedulers.io())
.flatMap( input -> getPerroDetails(input) )
.publish();

stringStream.connect();

getPerroDetails() 是如何实现的?您使用 RxJava 运算符之一来调用异步方法:

Observable<String> getPerroDetails(String perro)
{
return Observable
.just( perro + " reloaded" )
.delay(1000, TimeUnit.MILLISECONDS));
}

关于java - 在 ConnectableObservable 中执行异步查询的线程问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54297013/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com