gpt4 book ai didi

java - RxJava Observable 到 Completable,如何避免 toBlocking()

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:13:39 25 4
gpt4 key购买 nike

我目前在 Android 上使用 RxJava 和 Kotlin,但我有一个问题,如果不使用 toBlocking() 就无法解决。

我在员工服务中有一个返回 Observable 的方法>:

fun all(): Observable<List<Employee>>

这一切都很好,因为只要员工发生变化,这个 Observable 就会发出新的员工列表。但我想从员工那里生成一个 PDF 文件,显然不需要在每次员工变动时都运行。另外,我想从我的 PDF 生成器方法返回一个 Completable 对象。我想在我的 PDF 中添加一个标题,然后遍历员工并计算每个员工的工资,这也返回一个 Observable,这就是我现在使用 toBlocking 的地方。我目前的做法是:

private fun generatePdf(outputStream: OutputStream): Completable {
return employeeService.all().map { employees ->
try {
addHeaderToPDF()
for (i in employees) {
val calculated = employeeService.calculateWage(i.id).toBlocking().first()
// Print calculated to PDF....
}
addFooterToPDF()
return @map Completable.complete()
}
catch (e: Exception) {
return @map Completable.error(e)
}
}.first().toCompletable()

有没有什么方法可以使用 RxJava 使这段代码更简洁一些?

提前致谢!

最佳答案

免责声明:此答案正在进行中。


基本前提:如果你有 blocking在流中,你做错了。

注意:任何状态都不得离开可观察的 lambda。

第 1 步:流式传输整个数据集

输入是员工流。对于每个员工,您需要获得一份工资。让我们把它变成一个流。

/**
* @param employeesObservable
* Stream of employees we're interested in.
* @param wageProvider
* Transformation function which takes an employee and returns a [Single] of their wage.
* @return
* Observable stream spitting individual [Pair]s of employees and their wages.
*/
fun getEmployeesAndWagesObservable(
employeesObservable: Observable<Employee>,
wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

// Each Employee from the original stream will be converted
// to a Single<Pair<Employee, Int>> via flatMapSingle operator.
// Remember, we need a stream and Single is a stream.
employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
// We need to get a source of wage value for current employee.
// That source emits a single Int or errors.
val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

// Once the wage from said source is loaded...
val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
// ... construct a Pair<Employee, Int>
employee to wage
}

// This code is not executed now. It will be executed for each Employee
// after the original Observable<Employee> starts spitting out items.
// After subscribing to the resulting observable.
return@flatMapSingle employeeAndWageSingle
}

return employeesAndWagesObservable
}

订阅后会发生什么:

  1. 从源头聘用一名员工。
  2. 获取员工的工资。
  3. 吐出一对雇员和他们的工资。

重复此过程直到 employeesObservable信号 onComplete或者因 onError 而失败.

使用的运算符:

  • flatMapSingle : 将实际值转换为一些转换值的新单一流。
  • map : 将实际值转换为其他一些实际值(无嵌套流)。

看看你如何将它连接到你的代码:

fun doStuff() {
val employeesObservable = employeeService.all()
val wageProvider = Function<Employee, Single<Int>> { employee ->
// Don't listen to changes. Take first wage and use that.
employeeService.calculateWage(employee.id).firstOrError()
}

val employeesAndWagesObservable =
getEmployeesAndWagesObservable(employeesObservable, wageProvider)

// Subscribe...
}

使用的运算符:

  • first : 从 observable 中取出第一个项目并将其转换为单个流。
  • timeout : 一个好主意是 .timeout如果您通过网络获得工资。

后续步骤

选项 1:在此结束

不要订阅,打电话

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

并以同步方式处理每个项目。坐下来,找出下一步,观看演示文稿,阅读示例。

选项 2:添加图层

  1. .map每一个Pair<Employee, Int>一些抽象的 PDF 构建 block 。
  2. 通过 Observable.fromCallable { ... } 将您的页眉和页脚打印机转为 Observables ,让他们也返回 PDF 构建 block 。
  3. 通过 Observable.concat(headerObs, employeeDataObs, footerObs) 按顺序合并所有这些
  4. .subscribe为此结果并开始将 PDF 构建 block 写入 PDF 编写器。
  5. 待办事项:
    • 找出一种在订阅时延迟初始化 PDF 编写器的方法(而不是在构建流之前),
    • 删除出错的输出,
    • 在完成或出错时关闭输出流。

关于java - RxJava Observable 到 Completable,如何避免 toBlocking(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41534995/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com