作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个(Flowable
)项目流要使用单个公共(public)资源并行处理,并且之后必须处置该资源。我尝试使用 Single.using()
运算符,但它甚至在处理流中的第一个项目之前就释放了资源。
示例程序(Kotlin 语言):
package my.test.rx_task_queue
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger
object TestCommonResource {
private val logger = LoggerFactory.getLogger(TestCommonResource::class.java)
@JvmStatic
fun main(args: Array<String>) {
val queue = Flowable.fromIterable(1..5)
val resIdx = AtomicInteger(0)
val resource = Single.using({
val res = "resource-${resIdx.incrementAndGet()}"
logger.info("Init resource $res")
res
}, { res ->
Single.just(res)
}, { res ->
logger.info("Dispose resource $res")
}, false)
val result = resource.flatMap { res ->
queue.flatMapSingle({ item ->
Single.fromCallable {
logger.info("Process $item with $res")
"$item @ $res"
}
.subscribeOn(Schedulers.io())
}, false, 2)
.toList()
}
.blockingGet()
logger.info("Result: $result")
}
}
日志输出示例:
14:30:27.721 [main] INFO my.test.rx_task_queue.TestCommonResource - Init resource resource-1
14:30:27.744 [main] INFO my.test.rx_task_queue.TestCommonResource - Dispose resource resource-1
14:30:27.747 [RxCachedThreadScheduler-1] INFO my.test.rx_task_queue.TestCommonResource - Process 1 with resource-1
14:30:27.747 [RxCachedThreadScheduler-2] INFO my.test.rx_task_queue.TestCommonResource - Process 2 with resource-1
14:30:27.748 [RxCachedThreadScheduler-3] INFO my.test.rx_task_queue.TestCommonResource - Process 3 with resource-1
14:30:27.749 [RxCachedThreadScheduler-4] INFO my.test.rx_task_queue.TestCommonResource - Process 4 with resource-1
14:30:27.749 [RxCachedThreadScheduler-1] INFO my.test.rx_task_queue.TestCommonResource - Process 5 with resource-1
14:30:27.750 [main] INFO my.test.rx_task_queue.TestCommonResource - Result: [1 @ resource-1, 2 @ resource-1, 3 @ resource-1, 4 @ resource-1, 5 @ resource-1]
使用 Flowable.parallel()
而不是 flatMap()
会得到相同的结果。
最佳答案
处置随着源的处置而发生,因此如果您想在完成所有操作后进行处置,您只需让 singleFunction
返回整个流:
object TestCommonResource {
private val logger = LoggerFactory.getLogger(TestCommonResource::class.java)
@JvmStatic
fun main(args: Array<String>) {
val queue = Flowable.fromIterable(1..5)
val resIdx = AtomicInteger(0)
val result = Single.using({
val res = "resource-${resIdx.incrementAndGet()}"
logger.info("Init resource $res")
res
}, { res ->
queue.flatMapSingle({ item ->
Single.fromCallable {
logger.info("Process $item with $res")
"$item @ $res"
}
.subscribeOn(Schedulers.io())
}, false, 2)
.toList()
}, { res ->
logger.info("Dispose resource $res")
}, false)
.blockingGet()
logger.info("Result: $result")
}
}
关于java - rxjava 2 : how to dispose a resource after downstream completes processing,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56824566/
我是一名优秀的程序员,十分优秀!