- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Retrofit+Rxjava下载文件进度的实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
前言 。
最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看.
准备工作 。
本文采用Dagger2,Retrofit,RxJava.
1
2
3
4
5
6
7
8
9
10
|
compile
'com.squareup.retrofit2:retrofit:2.0.2'
compile
'com.squareup.retrofit2:converter-gson:2.0.2'
compile
'com.squareup.retrofit2:adapter-rxjava:2.0.2'
//dagger2
compile
'com.google.dagger:dagger:2.6'
apt
'com.google.dagger:dagger-compiler:2.6'
//RxJava
compile
'io.reactivex:rxandroid:1.2.0'
compile
'io.reactivex:rxjava:1.1.5'
compile
'com.jakewharton.rxbinding:rxbinding:0.4.0'
|
改造ResponseBody 。
okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public
class
ProgressResponseBody
extends
ResponseBody {
private
ResponseBody responseBody;
private
BufferedSource bufferedSource;
public
ProgressResponseBody(ResponseBody responseBody) {
this
.responseBody = responseBody;
}
@Override
public
MediaType contentType() {
return
responseBody.contentType();
}
@Override
public
long
contentLength() {
return
responseBody.contentLength();
}
@Override
public
BufferedSource source() {
if
(bufferedSource ==
null
) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return
bufferedSource;
}
private
Source source(Source source) {
return
new
ForwardingSource(source) {
long
bytesReaded =
0
;
@Override
public
long
read(Buffer sink,
long
byteCount)
throws
IOException {
long
bytesRead =
super
.read(sink, byteCount);
bytesReaded += bytesRead == -
1
?
0
: bytesRead;
//实时发送当前已读取的字节和总字节
RxBus.getInstance().post(
new
FileLoadEvent(contentLength(), bytesReaded));
return
bytesRead;
}
};
}
}
|
呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象.
FileLoadEvent 。
FileLoadEvent很简单,包含了当前已加载进度和文件总大小.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public
class
FileLoadEvent {
long
total;
long
bytesLoaded;
public
long
getBytesLoaded() {
return
bytesLoaded;
}
public
long
getTotal() {
return
total;
}
public
FileLoadEvent(
long
total,
long
bytesLoaded) {
this
.total = total;
this
.bytesLoaded = bytesLoaded;
}
}
|
RxBus 。
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
public
class
RxBus {
private
static
volatile
RxBus mInstance;
private
SerializedSubject<Object, Object> mSubject;
private
HashMap<String, CompositeSubscription> mSubscriptionMap;
/**
* PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
* Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,
* 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。
*/
private
RxBus() {
mSubject =
new
SerializedSubject<>(PublishSubject.create());
}
/**
* 单例 双重锁
* @return
*/
public
static
RxBus getInstance() {
if
(mInstance ==
null
) {
synchronized
(RxBus.
class
) {
if
(mInstance ==
null
) {
mInstance =
new
RxBus();
}
}
}
return
mInstance;
}
/**
* 发送一个新的事件
* @param o
*/
public
void
post(Object o) {
mSubject.onNext(o);
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
* @param type
* @param <T>
* @return
*/
public
<T> Observable<T> tObservable(
final
Class<T> type) {
//ofType操作符只发射指定类型的数据,其内部就是filter+cast
return
mSubject.ofType(type);
}
public
<T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
return
tObservable(type)
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next, error);
}
public
void
addSubscription(Object o, Subscription subscription) {
if
(mSubscriptionMap ==
null
) {
mSubscriptionMap =
new
HashMap<>();
}
String key = o.getClass().getName();
if
(mSubscriptionMap.get(key) !=
null
) {
mSubscriptionMap.get(key).add(subscription);
}
else
{
CompositeSubscription compositeSubscription =
new
CompositeSubscription();
compositeSubscription.add(subscription);
mSubscriptionMap.put(key, compositeSubscription);
// Log.e("air", "addSubscription:订阅成功 " );
}
}
public
void
unSubscribe(Object o) {
if
(mSubscriptionMap ==
null
) {
return
;
}
String key = o.getClass().getName();
if
(!mSubscriptionMap.containsKey(key)) {
return
;
}
if
(mSubscriptionMap.get(key) !=
null
) {
mSubscriptionMap.get(key).unsubscribe();
}
mSubscriptionMap.remove(key);
//Log.e("air", "unSubscribe: 取消订阅" );
}
}
|
FileCallBack 。
那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
public
abstract
class
FileCallBack<T> {
private
String destFileDir;
private
String destFileName;
public
FileCallBack(String destFileDir, String destFileName) {
this
.destFileDir = destFileDir;
this
.destFileName = destFileName;
subscribeLoadProgress();
}
public
abstract
void
onSuccess(T t);
public
abstract
void
progress(
long
progress,
long
total);
public
abstract
void
onStart();
public
abstract
void
onCompleted();
public
abstract
void
onError(Throwable e);
public
void
saveFile(ResponseBody body) {
InputStream is =
null
;
byte
[] buf =
new
byte
[
2048
];
int
len;
FileOutputStream fos =
null
;
try
{
is = body.byteStream();
File dir =
new
File(destFileDir);
if
(!dir.exists()) {
dir.mkdirs();
}
File file =
new
File(dir, destFileName);
fos =
new
FileOutputStream(file);
while
((len = is.read(buf)) != -
1
) {
fos.write(buf,
0
, len);
}
fos.flush();
unsubscribe();
//onCompleted();
}
catch
(FileNotFoundException e) {
e.printStackTrace();
}
catch
(IOException e) {
e.printStackTrace();
}
finally
{
try
{
if
(is !=
null
) is.close();
if
(fos !=
null
) fos.close();
}
catch
(IOException e) {
Log.e(
"saveFile"
, e.getMessage());
}
}
}
/**
* 订阅加载的进度条
*/
public
void
subscribeLoadProgress() {
Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent.
class
,
new
Action1<FileLoadEvent>() {
@Override
public
void
call(FileLoadEvent fileLoadEvent) {
progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());
}
},
new
Action1<Throwable>() {
@Override
public
void
call(Throwable throwable) {
//TODO 对异常的处理
}
});
RxBus.getInstance().addSubscription(
this
, subscription);
}
/**
* 取消订阅,防止内存泄漏
*/
public
void
unsubscribe() {
RxBus.getInstance().unSubscribe(
this
);
}
}
|
开始下载 。
使用自己的ProgressResponseBody 。
通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度.
1
2
3
4
5
6
7
8
9
|
public
class
ProgressInterceptor
implements
Interceptor {
@Override
public
Response intercept(Chain chain)
throws
IOException {
Response originalResponse = chain.proceed(chain.request());
return
originalResponse.newBuilder()
.body(
new
ProgressResponseBody(originalResponse.body()))
.build();
}
}
|
构建Retrofit 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Module
public
class
ApiModule {
@Provides
@Singleton
public
OkHttpClient provideClient() {
OkHttpClient client =
new
OkHttpClient.Builder()
.addInterceptor(
new
ProgressInterceptor())
.build();
return
client;
}
@Provides
@Singleton
public
Retrofit provideRetrofit(OkHttpClient client){
Retrofit retrofit =
new
Retrofit.Builder()
.client(client)
.baseUrl(Constant.HOST)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
return
retrofit;
}
@Provides
@Singleton
public
ApiInfo provideApiInfo(Retrofit retrofit){
return
retrofit.create(ApiInfo.
class
);
}
@Provides
@Singleton
public
ApiManager provideApiManager(Application application, ApiInfo apiInfo){
return
new
ApiManager(application,apiInfo);
}
}
|
请求接口 。
1
2
3
4
5
|
public
interface
ApiInfo {
@Streaming
@GET
Observable<ResponseBody> download(
@Url
String url);
}
|
执行请求 。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public
void
load(String url,
final
FileCallBack<ResponseBody> callBack){
apiInfo.download(url)
.subscribeOn(Schedulers.io())
//请求网络 在调度者的io线程
.observeOn(Schedulers.io())
//指定线程保存文件
.doOnNext(
new
Action1<ResponseBody>() {
@Override
public
void
call(ResponseBody body) {
callBack.saveFile(body);
}
})
.observeOn(AndroidSchedulers.mainThread())
//在主线程中更新ui
.subscribe(
new
FileSubscriber<ResponseBody>(application,callBack));
}
|
在presenter层中执行网络请求.
通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public
void
load(String url){
String fileName =
"app.apk"
;
String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();
Log.e(TAG,
"load: "
+fileStoreDir.toString() );
FileCallBack<ResponseBody> callBack =
new
FileCallBack<ResponseBody>(fileStoreDir,fileName) {
@Override
public
void
onSuccess(
final
ResponseBody responseBody) {
Toast.makeText(App.getInstance(),
"下载文件成功"
,Toast.LENGTH_SHORT).show();
}
@Override
public
void
progress(
long
progress,
long
total) {
iHomeView.update(total,progress);
}
@Override
public
void
onStart() {
iHomeView.showLoading();
}
@Override
public
void
onCompleted() {
iHomeView.hideLoading();
}
@Override
public
void
onError(Throwable e) {
//TODO: 对异常的一些处理
e.printStackTrace();
}
};
apiManager.load(url, callBack);
}
|
踩到的坑.
依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊.
保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程.
总结 。
看似代码很多,其实过程并不复杂:
在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象.
FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新.
在下载保存文件完成后,取消订阅,防止内存泄漏.
Demo地址:https://github.com/AirMiya/DownloadDemo 。
原文链接:http://www.jianshu.com/p/060d55fc1c82 。
最后此篇关于Retrofit+Rxjava下载文件进度的实现的文章就讲到这里了,如果你想了解更多关于Retrofit+Rxjava下载文件进度的实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!