gpt4 book ai didi

kotlin - 使用 RxJava 构建 "task queue"的最佳方式

转载 作者:行者123 更新时间:2023-12-02 13:00:20 27 4
gpt4 key购买 nike

目前我正在研究许多与网络相关的功能。目前,我正在处理一个允许我一次发送一条信息的网络 channel ,我必须等待它被确认才能发送下一条信息。我用 1..n 代表服务器连接的客户端。

其中一些消息,我必须以 block 的形式发送,这使用 RxJava 很容易做到。目前我的“写作”方法看起来有点像这样:

fun write(bytes: ByteArray, ignoreMtu: Boolean) = 
server.deviceList()
.first(emptyList())
.flatMapObservable { devices ->
Single.fromCallable {
if (ignoreMtu) {
bytes.size
} else {
devices.minBy { device -> device.mtu }?.mtu ?: DEFAULT_MTU
}
}
.flatMapObservable { minMtu ->
Observable.fromIterable(bytes.asIterable())
.buffer(minMtu)
}
.map { it.toByteArray() }
.doOnNext { server.currentData = bytes }
.map { devices }
// part i've left out: waiting for each device acknowledging the message, timeouts, etc.
}

这里缺少的是我只允许 one 的部分同时发送一条信息。另外,我需要的是,如果我要将一条消息添加到队列中,我必须能够仅观察到这条消息的状态(已完成、错误)。

我想过什么是实现这一目标的最优雅的方式。我提出的解决方案包括例如 PublishSubject<ByteArray>我在其中推送消息(类似队列),添加订阅者并观察它 - 但这会抛出例如 onError如果上一条消息失败。

我想到的另一种方法是在创建/排队时给每条消息一个数字,并有一个全局“消息计数器”Observable,我会用 filter 挂接到链的开头。对于 currently sent message == MY_MESSAGE_ID .但这感觉有点脆弱。我可以在订阅终止时增加计数器,但我确信必须有更好的方法来实现我的目标。

感谢您的帮助。

最佳答案

供将来引用:我发现的最直接的方法是添加一个在单个线程上工作的调度程序,从而按顺序处理每个任务。

关于kotlin - 使用 RxJava 构建 "task queue"的最佳方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49960751/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com