gpt4 book ai didi

android - Kotlin Flow 和 Websockets 在 Android 上具有干净的架构

转载 作者:行者123 更新时间:2023-12-05 04:39:04 26 4
gpt4 key购买 nike

最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道没有它怎么办。所以,我们尝试了著名的 Kotlin Flow 但我不知道我们的实现是否正确。

我们应用的架构分为四层:

  • 服务 - 从套接字发出和接收事件,
  • 存储库 - 过滤器、映射、转换等,
  • ViewModel - 填充 LiveData
  • Activity - 观察变化并更新 UI。

因此,我们按如下方式监听接收到服务中的事件:

fun listenMessages(): Flow<List<Message>> = channelFlow {
socket.on("NewMessage") { args ->
val message = gson.fromJson(args[0].toString(), ...)
trySend(message)
}
awaitClose()
}

我们使用 channelFlow 的协程在使用 trySend 接收到事件时发送给消费者,我们通过使用保持此 Flow Activity awaitClose

存储库在捕获 Flow 并将其发送回 ViewModel 后执行一些逻辑:

fun getMessages(): Flow<List<Message>> {
return service.listenMessages()
.filter { ... }
.map { ... }
}

然后,ViewModel 在收集 Flow 时启动协程并更新 LiveData:

fun getMessages() {
viewModelScope.launch(context = Dispatcher.IO) {
repository.getMessages()
.collect {
messagesLiveData.postValue(it)
}
}
}

这很有效,但是这引发了一些问题:

  • 这是正确的实现方式吗?
  • 当我们需要不断聆听时,channelFlow 是正确的选择吗?
  • 在这种情况下,我们是否应该使用经典的 Channel 而不是 Flow(hot vs cold)?

提前感谢您的建议。

最佳答案

channel 的概念意味着作为主要在协程之间通信的原语1 .据我了解,它基于 channelFlow docs ,它在引擎盖下使用一个Channel并将其转换为一个Flow。通过在引擎盖下使用 Channel,有一些重要的事情需要实现:

every value that is sent to the channel is received once. You cannot use channels to distribute events or state updates in a way that allows multiple subscribers to independently receive and react upon them. 1

根据您的架构,这可能无关紧要,但这个例子可能说明一些重要的事情:

fun ws(): Flow<List<Message>> = channelFlow {
println("channelFlow block called")
trySend(listOf(Message(0), Message(1)))
delay(2_000)
trySend(listOf(Message(2)))
}

fun main() = runBlocking {
val source = ws()

launch {
source.collect {
println("First collect got $it")
}
}
launch {
source.collect {
println("Second collect got $it")
}
}
}

生成输出:

channelFlow block called
channelFlow block called
First collect got [Message(id=0), Message(id=1)]
Second collect got [Message(id=0), Message(id=1)]
First collect got [Message(id=2)]
Second collect got [Message(id=2)]

由于 Channel 无法共享,每次在 source 上调用 .collect 时,都会触发 channelFlow body 被召唤!可能更令人惊讶的是,相同的 Flow 是通过 source 引用的,没有对 ws() 的新调用正在进行。

仔细查看 channelFlow 文档,您可以看到这一点

The resulting flow is cold, which means that block is called every time a terminal operator is applied to the resulting flow.2

由于 .collect() 是终端运算符(operator),它会触发对 channelFlow 主体的新调用。

现在,这对您的用例重要吗?我不确定;您将必须弄清楚您是否在多个位置订阅由 listenMessages 生成的流,或者该 channelFlow 的主体是否执行多次开销很大。

作为更一般的建议,我建议更明确地说明您的服务行为。 listenMessages 应该发送给所有订阅的人吗?它应该只发送给第一个可用的(另见 fan-out behavior )吗?如果你更喜欢前者,一个 SharedFlow会被推荐,如果稍后我会明确地直接公开一个 Channel,这样你就不会混淆下游消费者关于我上面显示的问题。

关于android - Kotlin Flow 和 Websockets 在 Android 上具有干净的架构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70477082/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com