gpt4 book ai didi

android - 具有 Retrofit、Coroutines 和 Suspend 功能的并行请求

转载 作者:行者123 更新时间:2023-12-02 14:59:14 24 4
gpt4 key购买 nike

我正在使用 Retrofit 来发出一些网络请求。我还将协程与“挂起”功能结合使用。

我的问题是:有没有办法改进以下代码。这个想法是并行启动多个请求,并等待它们全部完成,然后再继续该功能。

lifecycleScope.launch {
try {
itemIds.forEach { itemId ->
withContext(Dispatchers.IO) { itemById[itemId] = MyService.getItem(itemId) }
}
} catch (exception: Exception) {
exception.printStackTrace()
}

Log.i(TAG, "All requests have been executed")
}

(请注意,“MyService.getItem()”是一个“挂起”函数。)

我想在这种情况下有比foreach更好的东西。

有人有想法吗?

最佳答案

我准备了三种方法来解决这个问题,从最简单的到最正确的。为了简化方法的呈现,我提取了以下通用代码:

lifecycleScope.launch {
val itemById = try {
fetchItems(itemIds)
} catch (exception: Exception) {
exception.printStackTrace()
}
Log.i(TAG, "Fetched these items: $itemById")
}

在我继续之前,先说明一下:您的 getItem()函数是可挂起的,您无需将其提交到IO调度员。所有协程都可以在主线程上运行。

现在让我们看看如何实现 fetchItems(itemIds) .

1。简单的 forEach

这里我们利用了所有协程代码都可以在主线程上运行的事实:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> {
val itemById = mutableMapOf<Long, Item>()
coroutineScope {
itemIds.forEach { itemId ->
launch { itemById[itemId] = MyService.getItem(itemId) }
}
}
return itemById
}

coroutineScope将等待您的所有协程 launch在里面。即使它们都同时运行,启动的协程仍然分派(dispatch)到单个(主)线程,因此从每个协程更新 map 时不存在并发问题。

2。线程安全变体

它利用单线程上下文的属性这一事实可以被视为第一种方法的限制:它不能推广到基于线程池的上下文。我们可以通过依赖async-await来避免这个限制。机制:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = coroutineScope {
itemIds.map { itemId -> async { itemId to MyService.getItem(itemId) } }
.map { it.await() }
.toMap()
}

这里我们依赖 Collection.map() 的两个不明显的属性:

  1. 它急切地执行所有转换,因此第一次转换为 Deferred<Pair<Long, Item>> 的集合在进入第二阶段之前就已经完成了,我们正在等待所有这些。
  2. 它是一个内联函数,即使函数本身不是suspend fun,它也允许我们在其中编写可挂起的代码。并获得不可暂停的 lambda (Deferred<T>) -> T .

这意味着所有的获取都是同时完成的,但是 map 是在单个协程中组装的。

3。具有改进的并发控制的基于流的方法

上面为我们解决了并发问题,但是没有任何背压。如果您的输入列表非常大,您需要限制同时发出的网络请求数量。

您可以使用Flow来做到这一点基于习语:

suspend fun fetchItems(itemIds: Iterable<Long>): Map<Long, Item> = itemIds
.asFlow()
.flatMapMerge(concurrency = MAX_CONCURRENT_REQUESTS) { itemId ->
flow { emit(itemId to MyService.getItem(itemId)) }
}
.toMap()

神奇之处在于 .flatMapMerge手术。你给它一个函数(T) -> Flow<R>它将对所有输入顺序执行它,但随后它将同时收集它获得的所有流。请注意,我无法简化 flow { emit(getItem()) } }到只是flowOf(getItem())因为getItem()必须在收集流量时延迟调用。

Flow.toMap()目前标准库中没有提供,所以这里是:

suspend fun <K, V> Flow<Pair<K, V>>.toMap(): Map<K, V> {
val result = mutableMapOf<K, V>()
collect { (k, v) -> result[k] = v }
return result
}

关于android - 具有 Retrofit、Coroutines 和 Suspend 功能的并行请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58658630/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com