- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我有一个序列(来自 File.walkTopDown),我需要在每个序列上运行一个长时间运行的操作。我想使用 Kotlin 最佳实践/协程,但我要么没有并行性,要么并行性太多,并遇到“打开文件太多”的 IO 错误。
File("/Users/me/Pictures/").walkTopDown()
.onFail { file, ex -> println("ERROR: $file caused $ex") }
.filter { ... only big images... }
.map { file ->
async { // I *think* I want async and not "launch"...
ImageProcessor.fromFile(file)
}
}
这似乎不是并行运行的,而且我的多核 CPU 永远不会超过 1 个 CPU 的值(value)。有没有办法使用协程来运行“NumberOfCores 并行操作”的延迟作业?
我看了Multithreading using Kotlin Coroutines首先创建所有作业然后加入它们,但这意味着在繁重的处理加入步骤之前完成序列/文件树遍历,这似乎......将其拆分为收集和处理步骤意味着收集可以在处理之前运行。
val jobs = ... the Sequence above...
.toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }
最佳答案
这不是针对您的问题,但它确实回答了“如何限制 kotlin 协程最大并发性”的问题。
编辑:从 kotlinx.coroutines 1.6.0 ( https://github.com/Kotlin/kotlinx.coroutines/issues/2919 ) 开始,您可以使用 limitedParallelism
,例如Dispatchers.IO.limitedParallelism(123)
.
旧解决方案:一开始我想使用 newFixedThreadPoolContext
,但是 1) it's deprecated和 2) 它会使用线程,我认为这不是必要或不可取的(与 Executors.newFixedThreadPool().asCoroutineDispatcher()
相同)。此解决方案可能存在使用 Semaphore 时我不知道的缺陷,但很简单:
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
/**
* Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
*/
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
maxConcurrency: Int,
transform: suspend (TInput) -> TOutput,
) = coroutineScope {
val gate = Semaphore(maxConcurrency)
this@mapConcurrently.map {
async {
gate.withPermit {
transform(it)
}
}
}.awaitAll()
}
测试(抱歉,它使用 Spek、hamcrest 和 kotlin 测试):
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
val testDispatcher = TestCoroutineDispatcher()
afterEachTest {
// Clean up the TestCoroutineDispatcher to make sure no other work is running.
testDispatcher.cleanupTestCoroutines()
}
describe("mapConcurrently") {
it("should run all inputs concurrently if maxConcurrency >= size") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = inputs.size
// https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
runBlocking(testDispatcher) {
print("start runBlocking $coroutineContext\n")
// We have to run this async so that the code afterwards can advance the virtual clock
val job = launch {
testDispatcher.pauseDispatcher {
val result = inputs.mapConcurrently(maxConcurrency) {
print("action $it $coroutineContext\n")
// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
// Allow for virtual clock adjustment
delay(actionDelay)
// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
print("action $it after delay $coroutineContext\n")
it
}
// Order is not guaranteed, thus a Set
assertEquals(inputs.toSet(), result.toSet())
print("end mapConcurrently $coroutineContext\n")
}
}
print("before advanceTime $coroutineContext\n")
// Start the coroutines
testDispatcher.advanceTimeBy(0)
assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")
testDispatcher.advanceTimeBy(actionDelay)
print("after advanceTime $coroutineContext\n")
assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
job.join()
}
}
it("should run one at a time if maxConcurrency = 1") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1
runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
delay(actionDelay)
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
it
}
}
}
testDispatcher.advanceTimeBy(0)
assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")
val elapsedTime = testDispatcher.advanceUntilIdle()
print("elapsedTime=$elapsedTime")
assertThat(
"Virtual time should be at least as long as if all jobs ran sequentially",
elapsedTime,
greaterThanOrEqualTo(actionDelay * inputs.size)
)
job.join()
}
}
it("should handle cancellation") {
val jobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1
runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
jobCounter.addAndGet(1)
delay(actionDelay)
it
}
}
}
testDispatcher.advanceTimeBy(0)
assertEquals(1, jobCounter.get(), "Only one job should have started")
job.cancel()
testDispatcher.advanceUntilIdle()
assertEquals(1, jobCounter.get(), "Only one job should have run")
job.join()
}
}
}
})
根据 https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing ,您可能还需要调整编译器参数以运行测试:
compileTestKotlin {
kotlinOptions {
// Needed for runBlocking test coroutine dispatcher?
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
}
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
关于multithreading - 如何限制 kotlin 协程的最大并发性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686353/
我正在查看Kotlin Github page我注意到 Kotlin 语言本身大部分是用 Kotlin 编写的:我只是想知道,一种语言怎么可能大部分都是用它自己的语言编写的?在您可以使用正在创建的语言
我有以下非常简单的 kotlin 代码来演示中缀函数 com.lopushen.demo.presentation 包 fun main(args: Array) { print("Hello
我在 Java 中有 2 个模型类,其中一个扩展了另一个 @UseStag public class GenericMessages extends NavigationLocalizationMap
Kotlin 代码 runBlocking { flow { for (i in 0..4) { println("Emit $i")
这三个 Kotlin 插件和它们的实际作用有什么区别? plugins { id 'kotlin-android' id 'org.jetbrains.kotlin.android'
我正在为某些现有库添加 Kotlin 原生 linuxX64 目标支持。库已成功编译,但在运行测试用例时,出现以下运行时错误: kotlin.native.concurrent.InvalidMuta
关闭。这个问题需要details or clarity .它目前不接受答案。 想改进这个问题吗? 通过 editing this post 添加细节并澄清问题. 关闭 2 年前。 Improve t
我创建了一个类并向其添加了一个与成员函数具有相同签名的扩展,并执行了这个方法,它总是执行成员方法。 class Worker { fun work() = "...working" } fun
我知道传递给函数的参数将被视为“val”,即使变量被初始化为“var”。但这对我来说一直是个问题。在下面的示例代码中,我想通过使用函数“changeNum”修改变量“num”的值。但当然,Kotlin
现在,我正在尝试用 Kotlin 重写我的 Java 应用程序。然后,我遇到了日志语句,比如 log.info("do the print thing for {}", arg); 所以我有两种方法可
有点出名article关于许多语言的异步编程模型的状态,指出它们存在“颜色”问题,特别是将生态系统分为两个独立的世界:异步和非异步。以下是这种语言的属性: 每个函数都有一种颜色,红色或蓝色(例如asy
因为 KDoc 文档生成引擎是 abandoned in favor of Dokka , Kotlin 文档应该称为“KDoc 注释”,还是“Dokka 注释”? 最佳答案 如所述here , KD
我想在可空对象上传递函数引用。以 Android 为例,假设我想使用 Activity#onBackPressed来自作为该事件的子级的片段。 如果我想调用这个函数,我可以很容易地做到 activit
我有一个列表 (x, y)其中y只能是 0 或 1 这样 例如: [(3, 0), (3, 1), (5, 1)] [(5, 0), (3, 1), (5, 1)] [(1, 1), (3, 1),
从强类型语言的定义来看: A strongly-typed programming language is one in which each type of data (such as intege
这不能编译的事实是否意味着它们不是一流的类型? fun foo(s: String): Int = s.length // This won't compile. val bar = foo 有没有办
如果在 Java i++是一个表达式和 i++;是一个表达式语句,分号(;) 在 Kotlin 中是可选的,是 i++ Kotlin 中的表达式或表达式语句? 最佳答案 i++是一个表达式,因为它有一
代码(如下所示)是否正确?它取自 Kotlin-docs.pdf 的第 63 页,这也是 https://kotlinlang.org/docs/reference/generics.html 的最后
我正在尝试使用 Kotlin 为 Android 的一些全局 API 解析器(检查网络连接、调用 API 并通过来自源的单个调用返回格式化数据),并且在某些时候我不得不创建一个通用类型 object就
kotlinlang 中的任务: 使用月份变量重写此模式,使其与格式 13 JUN 1992(两位数字、一个空格、一个月份缩写、一个空格、四位数字)中的日期相匹配。 答案是:val month = "
我是一名优秀的程序员,十分优秀!