- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个生成消息的 JMS 队列。我想与多个 Kotlin 消费者共享这些消息,但前提是连接了 Kotlin 消费者。如果 Kotlin 消费者仅激活 5 分钟,它应该只在该窗口内接收消息。 Kotlin-consumer 应该可以随时订阅,并随时获取收到的消息。
通过阅读文档,我认为 Kotlin 的 SharedFlow
是最好的方法...
"SharedFlow is useful for broadcasting events that happen inside an application to subscribers that can come and go." (docs)
但我找不到任何好的例子,文档也很困惑。 SharedFlow
文档说“所有收集器都获取所有发出的值”和“共享流的事件收集器称为订阅者”,但它没有解释如何实际创建订阅者。
选项:
shareIn
说它将“冷流转换为热 SharedFlow”,但我没有冷流,我有热 SharedFlow。Flow.collect
在文档中有链接,但它被标记为内部:“这是一个内部 kotlinx.coroutines API,不应从 kotlinx.coroutines 外部使用。”launchIn
被描述为终端 - 但我不想结束消费amqMessageListener.messagesView.collect(object : FlowCollector<Message> { // internal API warning
override suspend fun emit(value: Message) { ... }
})
Flow.collect
和 launchIn
“从未正常完成”- 但我确实希望能够正常完成它们。这是我尝试订阅消息的方式,但我永远得不到任何结果。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
suspend fun main() = coroutineScope {
produceMessages()
delay(1000)
}
suspend fun produceMessages() = coroutineScope {
val messages = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
// emit messages
launch {
repeat(100000) {
println("emitting $it - result:${messages.tryEmit(it)}")
delay(Duration.seconds(0.5))
}
}
println("waiting 3")
delay(Duration.seconds(3))
launch {
messages.onEach { println("onEach") }
}
launch {
messages.onEach { println("onEach") }.launchIn(CoroutineScope(Dispatchers.Default))
}
launch {
messages.collect { println("collect") }
}
launch {
messages.launchIn(this)
messages.collect { println("launchIn + collect") }
}
launch {
val new = messages.shareIn(this, SharingStarted.Eagerly, replay = Int.MAX_VALUE)
delay(Duration.seconds(2))
println("new.replayCache: ${new.replayCache}")
}
launch {
println("sharing")
val l = mutableListOf<Int>()
val x = messages.onEach { println("hello") }.launchIn(this)
repeat(1000) {
delay(Duration.seconds(1))
println("result $it: ${messages.replayCache}")
println("result $it: ${messages.subscriptionCount.value}")
println("result $it: ${l}")
}
}
}
我有一个可行的解决方案。感谢 Tenfour04 的回答,这帮助我理解了。
这是一个接近我需要的示例。
import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
private val messagesFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
init {
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
messagesFlow.emit(it)
delay(Duration.seconds(0.5))
}
}
}
/** Create a new [SharedFlow] that receives all updates from [messagesFlow] */
fun listen(name: String): SharedFlow<Int> = runBlocking {
val listenerScope = CoroutineScope(SupervisorJob())
val capture = MutableSharedFlow<Int>(
replay = Int.MAX_VALUE,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
messagesFlow
.onEach {
println("$name is getting message $it")
capture.emit(it)
}
.launchIn(listenerScope)
capture.asSharedFlow()
}
/** Create a new [StateFlow], which holds all accumulated values of [messagesFlow] */
suspend fun collectState(name: String): StateFlow<List<Int>> {
return messagesFlow
.runningFold(emptyList<Int>()) { acc, value ->
println("$name is getting message $value")
acc + value
}
.stateIn(publishingScope)
}
}
fun main() {
val publisher = Publisher()
// both Fish and Llama can subscribe at any point, and get all subsequent values
runBlocking {
delay(Duration.seconds(2))
launch {
val listenerFish = publisher.collectState("Fish")
repeat(4) {
println("$it. Fish replayCache ${listenerFish.value}")
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(2))
launch {
val listenerLlama = publisher.listen("Llama")
repeat(4) {
println("$it. Llama replayCache" + listenerLlama.replayCache)
delay(Duration.seconds(2))
}
}
delay(Duration.seconds(10))
}
}
最佳答案
Flow.collect
有一个标记为内部的重载,但有一个公共(public)的 collect
扩展函数非常常用。我建议将这个包罗万象的导入放在文件的顶部,然后扩展函数将在其他 Flow 相关任务中可用:import kotlinx.coroutines.flow.*
launchIn
和 collect
是订阅流的两种最常见的方式。他们都是终端。 “终端”并不意味着它结束 消费……它意味着它开始消费! “非终结”函数是将一个 Flow 包装在另一个 Flow 中而不开始收集它的函数。
“Never complete normally”表示协程中它后面的代码将不会到达。 collect
订阅流并暂停协程直到流完成。由于 SharedFlow 永远不会完成,因此它“永远不会正常完成”。
很难对您的代码发表评论,因为启动您的流程并将其收集在同一个函数中是不寻常的。通常,SharedFlow 将作为属性公开以供其他函数使用。通过将它们全部组合到一个函数中,您隐藏了一个事实,即通常 SharedFlow 可能从不同的协程范围发布而不是从中收集。
这是一个部分改编自您的代码的示例:
class Publisher {
private val publishingScope = CoroutineScope(SupervisorJob())
val messagesFlow: SharedFlow<Int> = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
).also { flow ->
// emit messages
publishingScope.launch {
repeat(100000) {
println("emitting $it")
flow.emit(it)
delay(500)
}
}
}
}
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
// Delay a while. We'll miss the first couple messages.
delay(1300)
// Subscribe to the shared flow
subscribingScope.launch {
publisher.messagesFlow.collect { println("I am colllecting message $it") }
// Any code below collection in this inner coroutine won't be reached because collect doesn't complete normally.
}
delay(3000) // Keep app alive for a while
}
}
由于 collect
通常会阻止它下面的任何代码在协程中运行,因此 launchIn
函数可以使正在发生的事情更加明显,并且更加简洁:
fun main() {
val publisher = Publisher()
runBlocking {
val subscribingScope = CoroutineScope(SupervisorJob())
delay(1300)
publisher.messagesFlow.onEach { println("I am colllecting message $it") }
.launchIn(subscribingScope)
delay(3000)
}
}
关于Kotlin SharedFlow - 如何订阅?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68039394/
我正在查看Kotlin Github page我注意到 Kotlin 语言本身大部分是用 Kotlin 编写的:我只是想知道,一种语言怎么可能大部分都是用它自己的语言编写的?在您可以使用正在创建的语言
我有以下非常简单的 kotlin 代码来演示中缀函数 com.lopushen.demo.presentation 包 fun main(args: Array) { print("Hello
我在 Java 中有 2 个模型类,其中一个扩展了另一个 @UseStag public class GenericMessages extends NavigationLocalizationMap
Kotlin 代码 runBlocking { flow { for (i in 0..4) { println("Emit $i")
这三个 Kotlin 插件和它们的实际作用有什么区别? plugins { id 'kotlin-android' id 'org.jetbrains.kotlin.android'
我正在为某些现有库添加 Kotlin 原生 linuxX64 目标支持。库已成功编译,但在运行测试用例时,出现以下运行时错误: kotlin.native.concurrent.InvalidMuta
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 2 年前。 Improve t
我创建了一个类并向其添加了一个与成员函数具有相同签名的扩展,并执行了这个方法,它总是执行成员方法。 class Worker { fun work() = "...working" } fun
我知道传递给函数的参数将被视为“val”,即使变量被初始化为“var”。但这对我来说一直是个问题。在下面的示例代码中,我想通过使用函数“changeNum”修改变量“num”的值。但当然,Kotlin
现在,我正在尝试用 Kotlin 重写我的 Java 应用程序。然后,我遇到了日志语句,比如 log.info("do the print thing for {}", arg); 所以我有两种方法可
有点出名article关于许多语言的异步编程模型的状态,指出它们存在“颜色”问题,特别是将生态系统分为两个独立的世界:异步和非异步。以下是这种语言的属性: 每个函数都有一种颜色,红色或蓝色(例如asy
因为 KDoc 文档生成引擎是 abandoned in favor of Dokka , Kotlin 文档应该称为“KDoc 注释”,还是“Dokka 注释”? 最佳答案 如所述here , KD
我想在可空对象上传递函数引用。以 Android 为例,假设我想使用 Activity#onBackPressed来自作为该事件的子级的片段。 如果我想调用这个函数,我可以很容易地做到 activit
我有一个列表 (x, y)其中y只能是 0 或 1 这样 例如: [(3, 0), (3, 1), (5, 1)] [(5, 0), (3, 1), (5, 1)] [(1, 1), (3, 1),
从强类型语言的定义来看: A strongly-typed programming language is one in which each type of data (such as intege
这不能编译的事实是否意味着它们不是一流的类型? fun foo(s: String): Int = s.length // This won't compile. val bar = foo 有没有办
如果在 Java i++是一个表达式和 i++;是一个表达式语句,分号(;) 在 Kotlin 中是可选的,是 i++ Kotlin 中的表达式或表达式语句? 最佳答案 i++是一个表达式,因为它有一
代码(如下所示)是否正确?它取自 Kotlin-docs.pdf 的第 63 页,这也是 https://kotlinlang.org/docs/reference/generics.html 的最后
我正在尝试使用 Kotlin 为 Android 的一些全局 API 解析器(检查网络连接、调用 API 并通过来自源的单个调用返回格式化数据),并且在某些时候我不得不创建一个通用类型 object就
kotlinlang 中的任务: 使用月份变量重写此模式,使其与格式 13 JUN 1992(两位数字、一个空格、一个月份缩写、一个空格、四位数字)中的日期相匹配。 答案是:val month = "
我是一名优秀的程序员,十分优秀!