gpt4 book ai didi

Kotlin Coroutines channel 在 fixedRateTimer 内发送

转载 作者:行者123 更新时间:2023-12-02 13:28:36 27 4
gpt4 key购买 nike

我正在第一次使用 Kotlin Coroutines 从事一个爱好项目。我已经阅读并观看了有关它的视频,我有点了解这个概念。但我遇到了一个问题。让我告诉你我的代码。

package com.dev.tuber.ingestion.snapshots

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import org.joda.time.LocalTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.fixedRateTimer

object SnapshotsBuffer {
private val buffer = ConcurrentHashMap<Int, MutableMap<Int, Queue<Snapshot>>>()

init {
for (minute in 0..59) {
buffer[minute] = mutableMapOf()
}
}

suspend fun start(snapshotsChannel: Channel<Snapshot>, composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
startComposing(composeSnapshots)

for (snapshot in snapshotsChannel) {
val currentMinute = getCurrentMinute()
if (!buffer[currentMinute]!!.containsKey(snapshot.pair.id)) {
buffer[currentMinute]!![snapshot.pair.id] = LinkedList()
}

buffer[currentMinute]!![snapshot.pair.id]!!.add(snapshot)
println(buffer)
}
}

private fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
val oneMinute = (1000 * 60).toLong()

fixedRateTimer("consuming", true, oneMinute, oneMinute) {
val previousMinute = getPreviousMinute()
composeSnapshots.send(buffer[previousMinute]!!) <---- cannot do this
buffer[getPreviousMinute()] = mutableMapOf()
}
}

private fun getCurrentMinute(): Int {
return LocalTime().minuteOfHour
}

private fun getPreviousMinute(): Int {
val currentMinute = getCurrentMinute()

if(currentMinute == 0) return 59
return currentMinute - 1
}
}
所以。我有两个 channel 。第一个 channel 是 snapshotsChannel ,这是 Snapshot将要到达。我想缓冲 Snapshot并且只要一分钟过去,我想将缓冲区发送到 composeSnapshots进一步处理的 channel 。
基本上我得到了很多 Snapshot而且我不想将它们直接发送到进一步处理。所以这就是为什么我想每对每分钟缓冲它们。
问题出现在 startComposing功能。 fixedRateTimer不是可暂停的功能,所以我不能在这里使用发送功能。我现在有点卡住了,因为我找不到解决方案。我研究了 TickerChannel 和 Kotlin Flow,但这似乎不是解决我问题的正确方法。
你知道解决办法吗?

最佳答案

您不能从非挂起函数调用挂起函数 ( suspend fun Channel.send(element: E) )。
在协程方式中,您可以有一个无限循环,它会暂停一分钟并重复发送到 channel 。最棒的是,延迟与取消是合作的。

private suspend fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
val oneMinute = (1000 * 60).toLong()

while(true) {
delay(oneMinute)

val previousMinute = getPreviousMinute()
composeSnapshots.send(buffer[previousMinute]!!)
buffer[previousMinute] = mutableMapOf()
}
}

关于Kotlin Coroutines channel 在 fixedRateTimer 内发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62717097/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com