gpt4 book ai didi

kotlin - Kotlin Flow 的 GroupBy 运算符

转载 作者:行者123 更新时间:2023-12-01 13:13:16 25 4
gpt4 key购买 nike

我正在尝试从 RxJava 切换到 Kotlin Flow。流量真的很震撼。但是现在在 kotlin Flow 中是否有类似于 RxJava 的“GroupBy”的运算符?

最佳答案

从 Kotlin Coroutines 1.3 开始,标准库似乎没有提供这个操作符。然而,由于Flow的设计由于所有运算符都是扩展函数,提供它的标准库与您自己编写的库之间没有根本区别。

考虑到这一点,以下是我关于如何处理它的一些想法。

1.将每个组收集到列表中

如果您只需要每个键的所有项目的列表,请使用这个简单的实现,它发出成对的 (K, List<T>) :

fun <T, K> Flow<T>.groupToList(getKey: (T) -> K): Flow<Pair<K, List<T>>> = flow {
val storage = mutableMapOf<K, MutableList<T>>()
collect { t -> storage.getOrPut(getKey(t)) { mutableListOf() } += t }
storage.forEach { (k, ts) -> emit(k to ts) }
}

对于这个例子:
suspend fun main() {
val input = 1..10
input.asFlow()
.groupToList { it % 2 }
.collect { println(it) }
}

它打印
(1, [1, 3, 5, 7, 9])
(0, [2, 4, 6, 8, 10])

2.a 为每个组发出一个流

如果您需要完整的 RxJava 语义,将输入流转换为多个输出流(每个不同的键一个),事情就会变得更加复杂。

每当您在输入中看到一个新键时,您必须向下游发出一个新的内部流,然后在再次遇到相同的键时异步地将更多数据插入其中。

这是一个执行此操作的实现:
fun <T, K> Flow<T>.groupBy(getKey: (T) -> K): Flow<Pair<K, Flow<T>>> = flow {
val storage = mutableMapOf<K, SendChannel<T>>()
try {
collect { t ->
val key = getKey(t)
storage.getOrPut(key) {
Channel<T>(32).also { emit(key to it.consumeAsFlow()) }
}.send(t)
}
} finally {
storage.values.forEach { chan -> chan.close() }
}
}

它设置了一个 Channel对于每个键,并将 channel 作为流暴露给下游。

2.b 同时收集和减少分组流

groupBy在将流本身发送到下游后,继续将数据发送到内部流,您必须非常小心收集它们的方式。

您必须同时收集所有内部流,并发级别没有上限。否则,排队等待稍后收集的流 channel 最终会阻塞发送方,最终会陷入死锁。

这是一个可以正确执行此操作的函数:
fun <T, K, R> Flow<Pair<K, Flow<T>>>.reducePerKey(
reduce: suspend Flow<T>.() -> R
): Flow<Pair<K, R>> = flow {
coroutineScope {
this@reducePerKey
.map { (key, flow) -> key to async { flow.reduce() } }
.toList()
.forEach { (key, deferred) -> emit(key to deferred.await()) }
}
}
map stage 为它接收到的每个内部流启动一个协程。协程将其缩减为最终结果。
toList()是一个终端操作,收集整个上游流量,启动所有 async过程中的协程。即使我们仍在收集主流,协程也开始消耗内部流。这对于防止死锁至关重要。

最后,在所有协程启动后,我们启动一个 forEach等待并在最终结果可用时发出最终结果的循环。

您可以在 flatMapMerge 方面实现几乎相同的行为。 :
fun <T, K, R> Flow<Pair<K, Flow<T>>>.reducePerKey(
reduce: suspend Flow<T>.() -> R
): Flow<Pair<K, R>> = flatMapMerge(Int.MAX_VALUE) { (key, flow) ->
flow { emit(key to flow.reduce()) }
}

区别在于排序:第一个实现尊重输入中键的出现顺序,而这个没有。两者表现相似。

3. 例子

此示例对 4000 万个整数进行分组和求和:
suspend fun main() {
val input = 1..40_000_000
input.asFlow()
.groupBy { it % 100 }
.reducePerKey { sum { it.toLong() } }
.collect { println(it) }
}

suspend fun <T> Flow<T>.sum(toLong: suspend (T) -> Long): Long {
var sum = 0L
collect { sum += toLong(it) }
return sum
}

我可以使用 -Xmx64m 成功运行它.在我的 4 核笔记本电脑上,我每秒处理大约 400 万个项目。

像这样根据新解决方案重新定义第一个解决方案很简单:
fun <T, K> Flow<T>.groupToList(getKey: (T) -> K): Flow<Pair<K, List<T>>> =
groupBy(getKey).reducePerKey { toList() }

关于kotlin - Kotlin Flow 的 GroupBy 运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58625271/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com