gpt4 book ai didi

安卓改造 2 + RxJava : listen to endless stream

转载 作者:塔克拉玛干 更新时间:2023-11-01 21:23:41 24 4
gpt4 key购买 nike

可以用Retrofit + RxJava 来听个不停吗?例如 Twitter 流。我所拥有的是:

public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}

MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);

api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));

但是“onComplete”是在第一个对象被解析后调用的。有没有办法让 Retrofit 保持开放状态,直至另行通知?

最佳答案

这是我的解决方案:

您可以使用@Streaming 注释:

public interface ITwitterAPI {

@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}

ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);

使用@Streaming,我们可以从ResponseBody 获取原始输入。

这里是我的函数,它用事件的行来包装主体:

public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}

和结果用法:

api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);

关于优雅停止的更新

当我们取消订阅时,retrofit 会关闭输入流。但是不可能从输入流本身检测到输入流是否关闭,所以唯一的方法 - 尝试从流中读取 - 我们会收到 Socket closed 消息的异常。我们可以将此异常解释为结束:

        @Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}

如果连接因为响应结束而关闭,我们没有任何异常(exception)。在这里 isCompleted 检查一下。如果我错了请告诉我:)

关于安卓改造 2 + RxJava : listen to endless stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36603368/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com