gpt4 book ai didi

rx-java - 压缩可观察物的错误处理

转载 作者:行者123 更新时间:2023-12-04 04:21:21 27 4
gpt4 key购买 nike

我的用例是:我获得了一个永久链接列表,并且需要为每个永久链接发出两个REST请求才能部分获取其数据。当两个请求都返回时,我想将它们的信息合并在一起并对其进行处理(在这里-打印出来)。我想使用zip运算符使用代码来实现。这是我当前的代码(以及我正在使用的库的模拟):

public class Main {

public static void main(String[] args) {
ContentManager cm = new ContentManager();

Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}

return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}

class SubscribingRestCallback implements RestCallback {

private final Subscriber<? super Content> subscriber;

public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}

@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}

public class Content {

public final String permalink;

public final String logoUrl;

public final String streamUrl;

public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}

@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}

public interface RestCallback {

void onSuccess(Content content);

void onFailure(int code, String message);
}

class ContentManager {

private final Random random = new Random();

public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}

return permalinks;
}

public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}

public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}

private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}

通常,它可以工作,但是我不喜欢此实现中的错误处理。基本上,REST请求可能会失败,在这种情况下, onFailure方法将调用 subscriber.onNext(null),以便 zip方法始终可以使用某些功能(一个请求可能失败了,但另一个请求可能没有成功,我不知道哪个失败)。然后,在 zip函数中,我需要一个 if,以检查它们是否都不是 null(如果任何部分 Contentnull,我的代码将会崩溃)。

如果可能的话,我希望能够使用 null运算符过滤掉 filter。也许有比在失败情况下发出 null值更好的方法,但是仍然可以与 zip函数一起使用吗?

最佳答案

首先,向Subscriber通知错误的正确方法是调用subscriber.onError方法:

class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;

public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}

@Override
public void onFailure(int code, String message) {
subscriber.onError(new Exception(message));
}
}

即使您不希望整个流失败,也仍然需要调用 subscriber.onError()方法。还有其他一些方法可以使错误变浅。其中之一是 onErrorResumeNext运算符:
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}).onErrorResumeNext(Observable.empty()))
.subscribe(System.out::println);

编辑

I have one last question: if you notice my zipper functions, I return Observable.empty() if the two objects cannot be zipped, and once I return Content. This seems wrong. How should I handle such error conditions in the zipper function?



是的,返回 Observable.empty()是完全错误的。从 zip函数引发异常似乎是最好的解决方案:
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (!isDataValid(dataContent, streamUrlContent)) {
throw new RuntimeException("Something went wrong.");
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}).onErrorResumeNext(Observable.empty()))
.subscribe(System.out::println);

关于rx-java - 压缩可观察物的错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30822627/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com