gpt4 book ai didi

kotlin - 具有自定义计数标准的RxJava缓冲区/窗口

转载 作者:行者123 更新时间:2023-12-02 12:42:40 27 4
gpt4 key购买 nike

我有一个Observable,它发出许多对象,我想使用windowbuffer操作对这些对象进行分组。但是,我不想指定count参数来确定窗口中应包含多少个对象,而是希望能够使用自定义条件。

例如,假设可观察对象正在发出Message类的实例,如下所示。

class Message(
val int size: Int
)

我想根据其 size变量而不只是其计数来缓冲或窗口化消息实例。例如,要获取总大小最大为5000的消息窗口。
// Something like this
readMessages()
.buffer({ message -> message.size }, 5000)

是否有捷径可寻?

最佳答案

首先,我必须承认,我不是RxJava专家。
我刚刚发现您的问题具有挑战性,并试图找到解决方案。

有一个带有参数window()boundaryIndicator函数。如果达到窗口大小,则必须创建一个发出项目的Publisher / Flowable

在示例中,我创建了一个对象windowManager用作boundaryIndicator。在onNext回调中,我调用windowManager并给它机会打开一个新窗口。

val windowManager = object {
lateinit var emitter: FlowableEmitter<Unit>
var windowSize: Long = 0

fun createEmitter(emitter: FlowableEmitter<Unit>) {
this.emitter = emitter
}

fun openWindowIfRequired(size: Long) {
windowSize += size
if (windowSize > 5) {
windowSize = 0
emitter.onNext(Unit)
}
}
}

val windowBoundary = Flowable.create<Unit>(windowManager::createEmitter, BackpressureStrategy.ERROR)

Flowable.interval(1, TimeUnit.SECONDS).window(windowBoundary).subscribe {
it.doOnNext {
windowManager.openWindowIfRequired(it)
}.doOnSubscribe {
println("Open window")
}.doOnComplete {
println("Close window")
}.subscribe {
println(it)
}
}

关于kotlin - 具有自定义计数标准的RxJava缓冲区/窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52791691/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com