gpt4 book ai didi

system.reactive - 使用 RxJava 并行调用网络服务。这是正确的方法吗?

转载 作者:行者123 更新时间:2023-12-04 18:05:53 25 4
gpt4 key购买 nike

想法是并行进行 3 个网络调用。 (我使用谷歌作为演示目的的服务。以下工作,但不确定这是正确的方法还是可以简化。如果我必须结合所有三个搜索的响应,我该怎么办?请指教。

public class GoogleSearchRx
{
public static void main(String args[])
{
CountDownLatch latch = new CountDownLatch(3);

search("RxJava").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);

search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);

//run the last one on current thread
search("Erik Meijer").subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);

try
{
latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}

public static Observable<Elements> search(String q)
{
String google = "http://www.google.com/search?q=";

String charset = "UTF-8";
String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!

return Observable.create(new Observable.OnSubscribe<Elements>()
{

@Override public void call(Subscriber<? super Elements> subscriber)
{
out.println(currentThreadName() + "\tOnSubscribe.call");

try
{
Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
subscriber.onNext(links);
}
catch (IOException e)
{
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
}

最佳答案

通过问题的“组合所有三个搜索的响应”部分,您可能正在寻找 Zip .

Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
new Func3<Elements, Elements, Elements, Elements>() {
@Override
public Elements call(Elements result1, Elements result2, Elements result3) {
// Add all the results together...
return results;
}
}
).subscribeOn(Schedulers.io()).subscribe(
links -> {
links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
latch.countDown();
},
e -> {
out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
latch.countDown();
}
);

这假设您想同时处理所有结果(在订阅者中,这里)而不关心给定结果使用了哪个查询。

注意 zip 有不同的版本函数,取自 1..N 个观察值,和 Func1Func9FuncN ,允许您压缩特定或任意数量的 observable。

关于system.reactive - 使用 RxJava 并行调用网络服务。这是正确的方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28005876/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com