- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我对 Flowables 和将它们添加到 compositeDisposables 有疑问。我想从 Observable 切换到 Flowable,因为该操作可能会发出 1000 个或更多值。我对 rxjava2 不太熟悉,所以如果这个问题很愚蠢,请原谅我 :)
到目前为止,我是这样使用 observable 的:
public Observable<String> uploadPictureRx(String path)
{
return Observable.create(new ObservableOnSubscribe<String>()
{
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("SomeChild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
});
}
然后像这样调用方法:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
这很好用!但是,当我尝试对 Flowable 而不是 observable 执行相同操作时,它不会编译:
public Flowable<String> uploadPictureRx(String path)
{
return Flowable.create(new FlowableOnSubscribe<String>()
{
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception
{
Uri file = Uri.fromFile(new File(path));
String segment = file.getLastPathSegment();
UploadTask uploadTask = reference.child("somechild").child(segment).putFile(file);
uploadTask.addOnFailureListener(new OnFailureListener()
{
@Override
public void onFailure(@NonNull Exception exception)
{
e.onError(exception);
}
}).addOnSuccessListener(new OnSuccessListener<UploadTask.TaskSnapshot>()
{
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
downloadUrl = taskSnapshot.getDownloadUrl();
String url = downloadUrl.getPath();
e.onNext(url);
e.onComplete();
}
}).addOnProgressListener(new OnProgressListener<UploadTask.TaskSnapshot>()
{
@Override
public void onProgress(UploadTask.TaskSnapshot taskSnapshot)
{
//noinspection VisibleForTests
long bytes = taskSnapshot.getBytesTransferred();
String bytesS = String.valueOf(bytes);
e.onNext(bytesS);
}
});
}
}, BackpressureStrategy.BUFFER);
}
错误是: 类型参数“E”的推断类型“E”不在其范围内;应该实现 'org.reactivestreams.Subscriber
我的猜测是,Flowable 没有实现 Disposable,这就是它无法编译的原因。我不知道那是不是真的,只是我目前最好的猜测。或者我是否必须将 subscribeWith() 更改为 subscribe()?我不知道这种变化会产生什么影响。
无论如何,我非常感谢关于如何使它工作并将这个 Flowable 放入我的 compositedisposable 中的建议。
谢谢大家!
编辑:
尝试将 DisposableObserver 更改为订阅者。但这会导致以下错误: Compiler Error
最佳答案
出于背压的原因,Flowables 使用 Subscription 而不是 Disposable。基本上使用 Subscription.request() 方法来告诉 observable 我在那一刻想要多少项目。
更改代码:
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new DisposableObserver<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
进入
private void uploadPicToFireBaseStorage(String path)
{
compositeDisposable.add(storageService.uploadPictureRx(path)
.subscribeOn(Schedulers.io())
.observeOn(mainScheduler)
.subscribeWith(new ResourceSubscriber<String>()
{
@Override
public void onNext(String s)
{
String ss = s;
System.out.println(ss);
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onComplete()
{
view.displayToast("Picture Upload completed");
}
})
);
}
关于Android rxjava2 Flowable 与 compositedisposable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43169008/
我在 Eclipse 中使用 Spring Boot 和 Flowable API 创建了一个 Flowable 应用程序。现在我的要求是从 Flowable UI 运行该流程。我们是否可以部署现有的
当我在两个由 Flowable.create() 创建的 Flowable (a1, a2) 上使用 Flowable.merge() 时,它只会发出 a2。但是当我合并两个由 Flowable.in
RxJava 2.0.5 引入了 ParallelFlowable类型,对应Flowable.parallel()运算符(operator)。我发现实现并行性的一般建议是使用 flatMap。像这样:
我对 RxJava 还很陌生,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。 我有 Dao,它提供 Flowable在某些范围内到 Da
我是 RxJava2 新手,不确定我是否正确使用它。 我有几个从 TCP 流读取数据的输入流,并将数据读入 byte[]。然后,我采用 byte[] 并为每个 InputStream 创建一个 Flo
我如何从 room 中读取可流动的值列表并将其转换为另一个对象,该对象是 room 中更多值的组合 database.leadsDao().getLeads(leadState.name) .
我有一个用于将 UserScoreTO 列表分组到单个用户对象(多对一关系)的 Rx 流。 public void execute() { getUsers() .fl
如何使用 Flowable 指定流程持续时间?如果从 startEvent 过去,例如30 天,流程不会完成,应该关闭。我将 Spring Boot 与 flowable 一起使用。应该用图表还是代码
我有一个返回 Flowable 的函数接口(interface) interface Iface{ Flowable get(); } 现在让我们假设具体实现是: class IFaceImpl
根据下面的流程,在基数 = 5 的情况下并行执行“simpleLog”(异步)服务任务时,我们遇到了并发问题,此后流程不再继续执行“dummy”(独占)服务任务下一步也不会重试“simpleLog”中
如何中断长期任务? public Flowable simple(int number) { Flowable flowable = Flowable.create(emitter -> {
使用改造,我从服务器获取 Flowable 数据,然后在 View 模型中,我将 flowable 转换为 Livedata,最后观察 Activity 中的 LiveData。在 Activity
我是 Flowable 新手。我尝试创建 BPMN 部署到 MYSQL,并且部署成功。 但是在取消部署过程中,我将所有部署存储在列表中,然后根据我的条件执行取消部署,就像如果 resouceName
我对 RxJava 还很陌生,我需要创建包含多个数据源的存储库。这对我来说很复杂,因为有几个较小的子任务我不知道如何用 RxJava 实现。 首先我有自己写的dao,它处理InputStream,并提
您好,我是 RxJava 新手,我有一个接收 Flowable f2 的类,我需要从中获取值,而不更改任何数据(将值保存到本地缓存)。然后将其与其他 Flowable f1 连接起来并将其发送到更高级
我假设 DefaultAsyncJobExecutor 是默认选择作为 AsyncExecutor 接口(interface)实现的类(不确定这个假设是否正确) 所以基本上我想修改一个异步作业的默认超
我想在 Flowable 中的所有项目发出后发出最终项目。现在我可以通过以下代码实现这一点。 Flowable flowable = getFlowable(); flowable.toList()
我正在使用 reportlab 3.2.0。 SPACER = Spacer(0, 10) buff = BytesIO() doc = SimpleDocTemplate(buff, rightMa
我正在尝试为 Flowable 动态生成工作流文件并在旅途中部署它。 有两个挑战:1. 创建 BAR 文件来打包生成的 XML2. 动态部署。 有人试过吗?如果是,您能否提供帮助或建议替代方案 最佳答
我有一个数据访问对象,它将数据源中的每个项目传递给消费者: public interface Dao { void forEachItem(Consumer item); } 这总是以单线程方
我是一名优秀的程序员,十分优秀!