gpt4 book ai didi

Android rxjava2 Flowable 与 compositedisposable

转载 作者:行者123 更新时间:2023-11-29 14:30:10 26 4
gpt4 key购买 nike

我对 Flowables 和将它们添加到 compositeDisposables 有疑问。我想从 Observable 切换到 Flowable,因为该操作可能会发出 1000 个或更多值。我对 rxjava2 不太熟悉,所以如果这个问题很愚蠢,请原谅我 :)

到目前为止,我是这样使用 observable 的:

 public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);


uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}

然后像这样调用方法:

private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{

@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}

@Override
public void onError(Throwable e)
{
e.printStackTrace();
}

@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}

这很好用!但是,当我尝试对 Flowable 而不是 observable 执行相同操作时,它不会编译:

public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{

@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);

uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);

}

错误是: 类型参数“E”的推断类型“E”不在其范围内;应该实现 'org.reactivestreams.Subscriber

我的猜测是,Flowable 没有实现 Disposable,这就是它无法编译的原因。我不知道那是不是真的,只是我目前最好的猜测。或者我是否必须将 subscribeWith() 更改为 subscribe()?我不知道这种变化会产生什么影响。

无论如何,我非常感谢关于如何使它工作并将这个 Flowable 放入我的 compositedisposable 中的建议。

谢谢大家!

编辑:

尝试将 DisposableObserver 更改为订阅者。但这会导致以下错误: Compiler Error

最佳答案

出于背压的原因,Flowables 使用 Subscription 而不是 Disposable。基本上使用 Subscription.request() 方法来告诉 observable 我在那一刻想要多少项目。

更改代码:

private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{

@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}

@Override
public void onError(Throwable e)
{
e.printStackTrace();
}

@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}

进入

private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new ResourceSubscriber<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}

@Override
public void onError(Throwable e)
{
e.printStackTrace();
}

@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}

关于Android rxjava2 Flowable 与 compositedisposable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43169008/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com