gpt4 book ai didi

java - Rx 运算符。忽略直到发出下一个

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:10:34 27 4
gpt4 key购买 nike

在我的应用程序中,我有一些耗时的逻辑,可以通过多种方式启动,比如自动启动或由用户手动启动。

// Let's describe different event sources as relays
val autoStarts = PublishRelay.create<Unit>()
val manualStarts = PublishRelay.create<Unit>()
val syncStarts = PublishRelay.create<Unit>()

// This is my time consuming operation.
fun longOperation() = Observable.interval(10, TimeUnit.SECONDS).take(1).map { Unit }

val startsDisposable = Observable
.merge(
autoStarts.flatMap { Observable.just(Unit).delay(30, TimeUnit.SECONDS) },
manualStarts
)
.subscribe(syncStarts) // merge emissions of both sources into one

val syncDisposable = syncStarts
.concatMap {
longOperation()
}
.subscribe(autoStarts) // end of long operation trigger start of auto timer

启动继电器会产生很多排放物。假设用户单击按钮进行手动启动,距离定时器自动启动还有 5 秒。如果它是简单的 flatMap,这两个事件都会导致 longOperation() 开始。我只希望一个线程在里面运行 longOperation() ,所以如果它现在正在运行但没有完成 - 忽略开始发射,无论如何完成都会导致计时器重启。

ConcatMap 帮助我完成了一半 - 它将 longOperation() 添加到“队列”,因此它们被一个接一个地处理,但我怎么能写这个来忽略任何进一步开始,直到第一个完全完成?

最佳答案

您可以使用带有额外整数参数的 flatMap() 来限制并行度。

syncStarts
.onBackpressureDrop() // 1
.flatMap(() -> longOperation(), 1) // 2
...
  1. 丢弃 flatMap() 忙碌时发生的任何发射。
  2. 数字 1 是 flatMap() 进行的订阅数,本质上是强制操作按顺序进行。

以上实现了您想要的功能。但是,您没有指定在 longOperation() 运行后您希望发生什么:您是否希望在之后立即开始另一个操作?如果是这样,您需要更改背压处理以最多排队一次发射。

关于java - Rx 运算符。忽略直到发出下一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53815441/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com