- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我在弄清楚如何使用 channel 时遇到问题我希望它们在调用发送后立即将值推送给消费者,而不是在加载来自两个源的数据后获取值。
MainActivity.kt
fun loadData() {
textView.text = "LOADING"
launch {
repository.loadData().consumeEach { loaded ->
withContext(Dispatchers.Main) {
logd("Presenting: ${loaded.size}, $loaded")
textView.text = loaded.joinToString { "$it\n" }
}
}
}
Repository.kt
suspend fun loadData(): ReceiveChannel<List<String>> {
return coroutineScope {
produce(capacity = 2) {
launch {
val localData = local.loadData()
send(localData)
}
launch {
val remoteData = remote.loadData()
send(remoteData)
}
}
}
}
Remote.kt
override val data: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")
override suspend fun loadData(): List<String> {
logd("Loading remote started")
val wait = Random.nextLong(0, 500)
delay(wait)
logd("Remote loading took $wait")
logd("Loading remote finished: ${data.size}, $data")
return data
}
Local.kt
override val data: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")
override suspend fun loadData(): List<String> {
logd("Loading local started")
val wait = Random.nextLong(1000, 2000)
delay(wait)
logd("Local loading took $wait")
logd("Loading local finished: ${data.size}, $data")
return data
}
我在控制台中得到了这个
D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]
看起来两个来源的数据都是在达到容量后发出的。我希望它做的是消费者可以在发送数据后立即接收数据。所以控制台输出看起来更像这样。
D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]
我如何使用 coroutine.Channel 实现这一点(发送后立即消耗值)?
从 Repository#loadData()
中删除 coroutineScope{...}
后,它开始按预期工作。但现在我遇到了一个问题,我必须将范围作为函数参数传递,这对我来说非常难看。
Repository.kt
suspend fun loadData(scope: CoroutineScope): ReceiveChannel<List<String>> {
return scope.produce(capacity = 2) {
launch {
val localData = local.loadData()
send(localData)
}
launch {
val remoteData = remote.loadData()
send(remoteData)
}
invokeOnClose {
logd("Closing channel")
}
}
}
最佳答案
我认为您的代码看起来不错,可以完成您期望的工作。我认为您遇到的问题是日志记录在发生时没有到达您的控制台。请记住,日志记录本身有自己的缓冲和 IO 线程要经过。我已经尝试了您的代码并改用了 println
,并且我得到了您预期的行为。为了确认,您可以不进行随机等待,而是将每次等待的等待时间增加到 10 秒,并真正让它们一个接一个地发生。只是为了帮助您自己确认这一点,这是我的非 Android 版本的您正在尝试做的事情:
fun main() = runBlocking {
val start = System.currentTimeMillis()
launch(Dispatchers.Unconfined) {
loadData().consumeEach { loaded ->
println("Presenting: ${loaded.size}, $loaded")
}
}.join()
println("The whole thing took ${System.currentTimeMillis() - start}")
}
suspend fun CoroutineScope.loadData() = produce {
launch {
val localData = localloadData()
send(localData)
}
launch {
val remoteData = remoteloadData()
send(remoteData)
}
}
val remoteData: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")
suspend fun remoteloadData(): List<String> {
println("Loading remote started")
val wait = 500L
delay(wait)
println("Remote loading took $wait")
println("Loading remote finished: ${remoteData.size}, $remoteData")
return remoteData
}
val localData: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")
suspend fun localloadData(): List<String> {
println("Loading local started")
val wait = 1000L
delay(wait)
println("Local loading took $wait")
println("Loading local finished: ${localData.size}, $localData")
return localData
}
它产生这个:
Loading local started
Loading remote started
Remote loading took 500
Loading remote finished: 5, [R1, R2, R3, R4, R5]
Presenting: 5, [R1, R2, R3, R4, R5]
Local loading took 1000
Loading local finished: 5, [L1, L2, L3, L4, L5]
Presenting: 5, [L1, L2, L3, L4, L5]
The whole thing took 1046
编辑:我删除了你用来更新你的值的 withContext(Dispatchers.Main)
- 这真的不需要。您已经在这个阶段完成了异步工作。相反,您需要在顶级 launch
中指定上下文,就像现在一样。
除非您另有说明,否则下面的其余工作应继承该上下文。无需继续将上下文作为参数传递。
如果您确实发现自己处于另一个上下文取代继承上下文的位置,那么可能将其作为参数传递可能是一种方法,但我的偏好是找到一个解决方法并以一种方式表达它确实继承自调用上下文。
关于android - Coroutines Channel values consumeEach waits 而不是一次消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55983353/
我想使用我编写的类模块的事件。类模块如下所示 ''CError64Row Public Event ErrorClicked(ByVal row As Integer, ByVal column As
我正在寻找实现智能架构的良好实践,以及处理针对具有许多不同 wdsl web 服务的系统的集成的方法。 我已经有 2 年的爱好使用 C# 进行开发了~,因此我并不总是使用正确的术语,但我会尝试描述我正
目前,我正在为我的程序使用 Azure Consumer API。但它非常慢,几乎需要8秒才能给出响应。我现在应该怎么做?这是我正在使用的 azure API.. https://management
我的流程是: AcitveMQ 控制台在主题部分下显示了一个使用者,但是一旦
我一直在阅读类似 Why does a function that accepts a Box complain of a value being moved when a function that
AMQP 函数 consume() 是一个带有回调的阻塞函数,是否可以为 consume() 函数设置超时,以便在特定时间后不再阻塞并且代码执行完成? 最佳答案 是的,方法如下: $amqp = ne
我有一个客户端/服务器应用程序,其中客户端以 JSON 形式将对象发送到运行 PHP 脚本的服务器,然后将此数据放入数据库。 问题是解码是用 json_decode 函数完成的,它似乎适用于字符串而不
所以我已经模拟了我的生产者消费者问题并且我有下面的代码。我的问题是:如果消费者一直处于 while(true) 状态,他如何停止。 在下面的代码中,我添加了 i
我无法使用在delphi 中开发的dll 的功能。我在类型转换方面遇到了一些困难。 这是我要调用 DLL 的函数: function rData(ID: Cardinal; queue: WideSt
我想使用 Unity3D 可视化 Kafka 流。在 Unity 中访问数据流的最佳方式是什么? 我已经用 Node 和 C# 编写了基本使用者,但我不确定如何将它们合并到 Unity 中。任何帮助表
如果标题太笼统,我很抱歉,但我已经浏览了一个小时的互联网,但找不到任何架构解释。我对 RSS 和 Atom 协议(protocol)都是全新的,据我到目前为止所了解的是: 服务器发布文档 客户端订阅此
我很喜欢我刚刚发现的 Guzzle 框架。我正在使用它使用不同的响应结构跨多个 API 聚合数据。它可以使用 JSON 和 XML 找到,但我需要使用的服务之一使用 SOAP。是否有使用 Guzzle
有没有一种方法可以像访问 Microsoft.Azure.Management.Fluent 一样访问 Azure.Management.Conclusion.Models? 当我执行以下代码时,我看
我有这个部分场景图树: CustomPane (with onMouseClicked Handler) → ChildNode (with onMousePressed Handler) 当我在
我的问题是这个 json。 http://dev-rexolution.pantheonsite.io/api/noticias 我只需要使用 vuejs 2 使用数组的第一个元素才能显示它,使用我工
我是 ML 新手,一直在研究 CNTK 教程。我已经成功训练了几个模型。 我完成了迁移学习教程 ( https://github.com/Microsoft/CNTK/blob/v2.1/Tutori
我是 RabbitMq 和 AMQP 的新手,但我对 ActiveMQ 和 JMS 有一些经验。我尝试在主题(JMS 中的主题之类的主题)中发布一条消息,并从多个监听器中使用此消息。比如我发布一条消息
我正在尝试让我的服务器解析以下 JSON: {"hardwareId":1,"registerTime":"2017-02-14T03:42:11.482Z","sensorId":1,"temper
我正在开发一个从外部 url 使用 json 的网站,我试过了但是我得到了一个错误 XMLHttpRequest 无法加载 http://reuniyo.com/tst/json.php。 Acces
我正在尝试使用Kafka Streams(即不是简单的Kafka Consumer)从重试主题中读取之前无法处理的事件。我希望从重试主题中进行消费,如果处理仍然失败(例如,如果外部系统已关闭),我希望
我是一名优秀的程序员,十分优秀!