- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅激活 5 分钟,它应该只在该窗口内接收消息。 Kotlin-consumer 应该可以随时订阅,并随时获取收到的消息。
通过阅读文档,我认为 Kotlin 的 SharedFlow
是最好的方法...
"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)
但我找不到任何好的例子,文档也很困惑。 SharedFlow
文档说“所有收集器都获取所有发出的值”和“共享流的事件收集器称为订阅者”,但它没有解释如何实际创建订阅者。
选项:
shareIn
说它将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。Flow.collect
在文档中有链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用。”launchIn
被描述为终端 - 但我不想结束消费amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
Flow.collect
和 launchIn
“从未正常完成”- 但我确实希望能够正常完成它们。这是我尝试订阅消息的方式,但我永远得不到任何结果。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}
suspend fun produceMessages() = coroutineScope {
val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}
println("waiting 3")
delay(Duration.seconds(3))
launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}
launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}
launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}
launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)
repeat(1000) {
delay(Duration.seconds(1))
println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}
我有一个可行的解决方案。感谢 Tenfour04 的回答,这帮助我理解了。
这是一个接近我需要的示例。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}
/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {
val listenerScope = CoroutineScope(SupervisorJob())
val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)
capture.asSharedFlow()
}
/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}
}
fun main() {
val publisher = Publisher()
// both Fish and Llama can subscribe at any point, and get all subsequent values
runBlocking {
delay(Duration.seconds(2))
launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(2))
launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(10))
}
}
最佳答案
Flow.collect
有一个标记为内部的重载,但有一个公共(public)的 collect
扩展函数非常常用。我建议将这个包罗万象的导入放在文件的顶部,然后扩展函数将在其他 Flow 相关任务中可用:import kotlinx.coroutines.flow.*
launchIn
和 collect
是订阅流的两种最常见的方式。他们都是终端。 “终端”并不意味着它结束 消费……它意味着它开始消费! “非终结”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。
“Never complete normally”表示协程中它后面的代码将不会到达。 collect
订阅流并暂停协程直到流完成。由于 SharedFlow 永远不会完成,因此它“永远不会正常完成”。
很难对您的代码发表评论,因为启动您的流程并将其收集在同一个函数中是不寻常的。通常,SharedFlow 将作为属性公开以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了一个事实,即通常 SharedFlow 可能从不同的协程范围发布而不是从中收集。
这是一个部分改编自您的代码的示例:
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
// Delay a while. We'll miss the first couple messages.
delay(1300)
// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}
delay(3000) // Keep app alive for a while
}
}
由于 collect
通常会阻止它下面的任何代码在协程中运行,因此 launchIn
函数可以使正在发生的事情更加明显,并且更加简洁:
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
delay(1300)
publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)
delay(3000)
}
}
关于Kotlin SharedFlow - 如何订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68039394/
我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅激活 5 分钟,它应该只在该窗口内接收消息。 Kotl
在我的 ViewModel 中,我发出 API 请求并使用 StateFlow 和 SharedFlow 与 fragment 通信。在发出 API 请求时,我可以轻松地更新状态流的值并成功收集到 f
我有一个 SharedFlow .当ViewModel已创建,我将值更改为 Val1 .之后,我使用 viewModelScope对 3 seconds 进行一些虚假的延迟然后将值更改为 Val2 .
我试图使用 sharedFlow 将事件从 UI 传递到 viewModel 这是我的 View 模型类 class MainActivityViewModel () : ViewModel() {
我有一个使用三个 LiveData 源的 MediatorLiveData。当它们中的任何一个发出一个新值并且我每个都有一个时,我使用这三个值来生成 UI 的输出。 其中两个来源是关于如何对列表进行排
和 有什么区别?共享流 和 状态流 ? 以及如何在 中使用这些MVI 建筑学?使用简单 更好吗?流量 还是这些状态和事件? 最佳答案 Flow 很冷!,这意味着它仅在收集数据时才发出数据。 Flow
我第一次潜入 Kotlin Flow,我想知道 ViewModel 是否还有一席之地。 ViewModel 的优势在于它具有生命周期感知能力,并且会在 Activity 被销毁时自动取消对 ViewM
我有一个具有以下属性的 android viewmodel 类 private val _trainingNavigationEvents = MutableSharedFlow(replay = 0
嘿,我正在学习 kotlin 中的流程。我正在学习 可变状态流 和 可变共享流 .我努力学习 可变状态流 在现实世界的例子中。但我无法获得 可变共享流 例如,它更适合哪个地方。我尝试了一些 可变状态流
我正在合并两个 SharedFlows,然后执行一个长时间的工作操作。 一开始,我知道状态,所以我为两个流发出一个“起始值”。之后,用户可以发送到任一流。 这两个流大多是独立的,但在特定情况下,用户可
我读过类似的主题,但找不到正确的答案: How to end / close a MutableSharedFlow? Kotlin Flow: How to unsubscribe/stop Sta
我是一名优秀的程序员,十分优秀!