gpt4 book ai didi

Kotlin SharedFlow - 如何订阅?

转载 作者:行者123 更新时间:2023-12-05 01:59:09 26 4
gpt4 key购买 nike

我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅激活 5 分钟,它应该只在该窗口内接收消息。 Kotlin-consumer 应该可以随时订阅,并随时获取收到的消息。

通过阅读文档,我认为 Kotlin 的 SharedFlow 是最好的方法...

"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)

但我找不到任何好的例子,文档也很困惑。 SharedFlow 文档说“所有收集器都获取所有发出的值”和“共享流的事件收集器称为订阅者”,但它没有解释如何实际创建订阅者。

选项:

  • shareIn 说它将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。
  • Flow.collect在文档中有链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用。”
  • launchIn 被描述为终端 - 但我不想结束消费
amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
  • Flow.collectlaunchIn“从未正常完成”- 但我确实希望能够正常完成它们。

这是我尝试订阅消息的方式,但我永远得不到任何结果。


import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}

suspend fun produceMessages() = coroutineScope {

val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)

// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}

println("waiting 3")
delay(Duration.seconds(3))

launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}

launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}

launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}

launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)

repeat(1000) {
delay(Duration.seconds(1))

println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}

更新

我有一个可行的解决方案。感谢 Tenfour04 的回答,这帮助我理解了。

这是一个接近我需要的示例。

import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())

private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)

init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}

/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {

val listenerScope = CoroutineScope(SupervisorJob())

val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)

messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)

capture.asSharedFlow()
}

/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}

}

fun main() {
val publisher = Publisher()

// both Fish and Llama can subscribe at any point, and get all subsequent values

runBlocking {
delay(Duration.seconds(2))

launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}

delay(Duration.seconds(2))

launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}

delay(Duration.seconds(10))
}
}

最佳答案

Flow.collect 有一个标记为内部的重载,但有一个公共(public)的 collect 扩展函数非常常用。我建议将这个包罗万象的导入放在文件的顶部,然后扩展函数将在其他 Flow 相关任务中可用:import kotlinx.coroutines.flow.*

launchIncollect 是订阅流的两种最常见的方式。他们都是终端。 “终端”并不意味着它结束 消费……它意味着它开始消费! “非终结”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。

“Never complete normally”表示协程中它后面的代码将不会到达。 collect 订阅流并暂停协程直到流完成。由于 SharedFlow 永远不会完成,因此它“永远不会正常完成”。

很难对您的代码发表评论,因为启动您的流程并将其收集在同一个函数中是不寻常的。通常,SharedFlow 将作为属性公开以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了一个事实,即通常 SharedFlow 可能从不同的协程范围发布而不是从中收集。

这是一个部分改编自您的代码的示例:

class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())

val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}

fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())

// Delay a while. We'll miss the first couple messages.
delay(1300)

// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}

delay(3000) // Keep app alive for a while
}
}

由于 collect 通常会阻止它下面的任何代码在协程中运行,因此 launchIn 函数可以使正在发生的事情更加明显,并且更加简洁:

fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())

delay(1300)

publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)

delay(3000)
}
}

关于Kotlin SharedFlow - 如何订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68039394/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com