gpt4 book ai didi

Retrofit+Rxjava下载文件进度的实现

转载 作者:qq735679552 更新时间:2022-09-28 22:32:09 26 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Retrofit+Rxjava下载文件进度的实现由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

前言 。

最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看.

准备工作 。

本文采用Dagger2,Retrofit,RxJava.

?
1
2
3
4
5
6
7
8
9
10
compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:converter-gson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'
//dagger2
compile 'com.google.dagger:dagger:2.6'
apt 'com.google.dagger:dagger-compiler:2.6'
//RxJava
compile 'io.reactivex:rxandroid:1.2.0'
compile 'io.reactivex:rxjava:1.1.5'
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造ResponseBody 。

okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ProgressResponseBody extends ResponseBody {
  private ResponseBody responseBody;
  private BufferedSource bufferedSource;
  public ProgressResponseBody(ResponseBody responseBody) {
  this .responseBody = responseBody;
  }
  @Override
  public MediaType contentType() {
  return responseBody.contentType();
  }
  @Override
  public long contentLength() {
  return responseBody.contentLength();
  }
  @Override
  public BufferedSource source() {
  if (bufferedSource == null ) {
   bufferedSource = Okio.buffer(source(responseBody.source()));
  }
  return bufferedSource;
  }
  private Source source(Source source) {
  return new ForwardingSource(source) {
   long bytesReaded = 0 ;
   @Override
   public long read(Buffer sink, long byteCount) throws IOException {
   long bytesRead = super .read(sink, byteCount);
   bytesReaded += bytesRead == - 1 ? 0 : bytesRead;
   //实时发送当前已读取的字节和总字节
   RxBus.getInstance().post( new FileLoadEvent(contentLength(), bytesReaded));
   return bytesRead;
   }
  };
  }
}

呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象.

FileLoadEvent 。

FileLoadEvent很简单,包含了当前已加载进度和文件总大小.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FileLoadEvent {
  long total;
  long bytesLoaded;
  public long getBytesLoaded() {
  return bytesLoaded;
  }
  public long getTotal() {
  return total;
  }
  public FileLoadEvent( long total, long bytesLoaded) {
  this .total = total;
  this .bytesLoaded = bytesLoaded;
  }
}

RxBus 。

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class RxBus {
  private static volatile RxBus mInstance;
  private SerializedSubject<Object, Object> mSubject;
  private HashMap<String, CompositeSubscription> mSubscriptionMap;
  /**
  * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
  * Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,
  * 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。
  */
  private RxBus() {
  mSubject = new SerializedSubject<>(PublishSubject.create());
  }
  /**
  * 单例 双重锁
  * @return
  */
  public static RxBus getInstance() {
  if (mInstance == null ) {
   synchronized (RxBus. class ) {
   if (mInstance == null ) {
    mInstance = new RxBus();
   }
   }
  }
  return mInstance;
  }
  /**
  * 发送一个新的事件
  * @param o
  */
  public void post(Object o) {
  mSubject.onNext(o);
  }
  /**
  * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
  * @param type
  * @param <T>
  * @return
  */
  public <T> Observable<T> tObservable( final Class<T> type) {
  //ofType操作符只发射指定类型的数据,其内部就是filter+cast
  return mSubject.ofType(type);
  }
  public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
  return tObservable(type)
   .onBackpressureBuffer()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(next, error);
  }
  public void addSubscription(Object o, Subscription subscription) {
  if (mSubscriptionMap == null ) {
   mSubscriptionMap = new HashMap<>();
  }
  String key = o.getClass().getName();
  if (mSubscriptionMap.get(key) != null ) {
   mSubscriptionMap.get(key).add(subscription);
  } else {
   CompositeSubscription compositeSubscription = new CompositeSubscription();
   compositeSubscription.add(subscription);
   mSubscriptionMap.put(key, compositeSubscription);
   // Log.e("air", "addSubscription:订阅成功 " );
  }
  }
  public void unSubscribe(Object o) {
  if (mSubscriptionMap == null ) {
   return ;
  }
  String key = o.getClass().getName();
  if (!mSubscriptionMap.containsKey(key)) {
   return ;
  }
  if (mSubscriptionMap.get(key) != null ) {
   mSubscriptionMap.get(key).unsubscribe();
  }
  mSubscriptionMap.remove(key);
  //Log.e("air", "unSubscribe: 取消订阅" );
  }
}

FileCallBack 。

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public abstract class FileCallBack<T> {
  private String destFileDir;
  private String destFileName;
  public FileCallBack(String destFileDir, String destFileName) {
  this .destFileDir = destFileDir;
  this .destFileName = destFileName;
  subscribeLoadProgress();
  }
  public abstract void onSuccess(T t);
  public abstract void progress( long progress, long total);
  public abstract void onStart();
  public abstract void onCompleted();
  public abstract void onError(Throwable e);
  public void saveFile(ResponseBody body) {
  InputStream is = null ;
  byte [] buf = new byte [ 2048 ];
  int len;
  FileOutputStream fos = null ;
  try {
   is = body.byteStream();
   File dir = new File(destFileDir);
   if (!dir.exists()) {
   dir.mkdirs();
   }
   File file = new File(dir, destFileName);
   fos = new FileOutputStream(file);
   while ((len = is.read(buf)) != - 1 ) {
   fos.write(buf, 0 , len);
   }
   fos.flush();
   unsubscribe();
   //onCompleted();
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } finally {
   try {
   if (is != null ) is.close();
   if (fos != null ) fos.close();
   } catch (IOException e) {
   Log.e( "saveFile" , e.getMessage());
   }
  }
  }
  /**
  * 订阅加载的进度条
  */
  public void subscribeLoadProgress() {
  Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent. class , new Action1<FileLoadEvent>() {
   @Override
   public void call(FileLoadEvent fileLoadEvent) {
   progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());
   }
  }, new Action1<Throwable>() {
   @Override
   public void call(Throwable throwable) {
   //TODO 对异常的处理
   }
  });
  RxBus.getInstance().addSubscription( this , subscription);
  }
  /**
  * 取消订阅,防止内存泄漏
  */
  public void unsubscribe() {
  RxBus.getInstance().unSubscribe( this );
  }
}

开始下载 。

使用自己的ProgressResponseBody 。

通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度.

?
1
2
3
4
5
6
7
8
9
public class ProgressInterceptor implements Interceptor {
  @Override
  public Response intercept(Chain chain) throws IOException {
  Response originalResponse = chain.proceed(chain.request());
  return originalResponse.newBuilder()
   .body( new ProgressResponseBody(originalResponse.body()))
   .build();
  }
}

构建Retrofit 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Module
public class ApiModule {
  @Provides
  @Singleton
  public OkHttpClient provideClient() {
  OkHttpClient client = new OkHttpClient.Builder()
   .addInterceptor( new ProgressInterceptor())
   .build();
  return client;
  }
  @Provides
  @Singleton
  public Retrofit provideRetrofit(OkHttpClient client){
  Retrofit retrofit = new Retrofit.Builder()
   .client(client)
   .baseUrl(Constant.HOST)
   .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
   .addConverterFactory(GsonConverterFactory.create())
   .build();
  return retrofit;
  }
  @Provides
  @Singleton
  public ApiInfo provideApiInfo(Retrofit retrofit){
  return retrofit.create(ApiInfo. class );
  }
  @Provides
  @Singleton
  public ApiManager provideApiManager(Application application, ApiInfo apiInfo){
  return new ApiManager(application,apiInfo);
  }
}

请求接口 。

?
1
2
3
4
5
public interface ApiInfo {
  @Streaming
  @GET
  Observable<ResponseBody> download( @Url String url);
}

执行请求 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
public void load(String url, final FileCallBack<ResponseBody> callBack){
  apiInfo.download(url)
   .subscribeOn(Schedulers.io()) //请求网络 在调度者的io线程
   .observeOn(Schedulers.io()) //指定线程保存文件
   .doOnNext( new Action1<ResponseBody>() {
    @Override
    public void call(ResponseBody body) {
    callBack.saveFile(body);
    }
   })
   .observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui
   .subscribe( new FileSubscriber<ResponseBody>(application,callBack));
  }

在presenter层中执行网络请求.

通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void load(String url){
  String fileName = "app.apk" ;
  String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();
  Log.e(TAG, "load: " +fileStoreDir.toString() );
  FileCallBack<ResponseBody> callBack = new FileCallBack<ResponseBody>(fileStoreDir,fileName) {
   @Override
   public void onSuccess( final ResponseBody responseBody) {
   Toast.makeText(App.getInstance(), "下载文件成功" ,Toast.LENGTH_SHORT).show();
   }
   @Override
   public void progress( long progress, long total) {
   iHomeView.update(total,progress);
   }
   @Override
   public void onStart() {
   iHomeView.showLoading();
   }
   @Override
   public void onCompleted() {
   iHomeView.hideLoading();
   }
   @Override
   public void onError(Throwable e) {
   //TODO: 对异常的一些处理
   e.printStackTrace();
   }
  };
  apiManager.load(url, callBack);
  }

踩到的坑.

依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊.

保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程.

总结 。

看似代码很多,其实过程并不复杂:

在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象.

FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新.

在下载保存文件完成后,取消订阅,防止内存泄漏.

Demo地址:https://github.com/AirMiya/DownloadDemo 。

原文链接:http://www.jianshu.com/p/060d55fc1c82 。

最后此篇关于Retrofit+Rxjava下载文件进度的实现的文章就讲到这里了,如果你想了解更多关于Retrofit+Rxjava下载文件进度的实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com