gpt4 book ai didi

multithreading - 如何限制 kotlin 协程的最大并发性

转载 作者:IT老高 更新时间:2023-10-28 13:44:06 26 4
gpt4 key购买 nike

我有一个序列(来自 File.walkTopDown),我需要在每个序列上运行一个长时间运行的操作。我想使用 Kotlin 最佳实践/协程,但我要么没有并行性,要么并行性太多,并遇到“打开文件太多”的 IO 错误。

File("/Users/me/Pictures/").walkTopDown()
.onFail { file, ex -> println("ERROR: $file caused $ex") }
.filter { ... only big images... }
.map { file ->
async { // I *think* I want async and not "launch"...
ImageProcessor.fromFile(file)
}
}

这似乎不是并行运行的,而且我的多核 CPU 永远不会超过 1 个 CPU 的值(value)。有没有办法使用协程来运行“NumberOfCores 并行操作”的延迟作业?

我看了Multithreading using Kotlin Coroutines首先创建所有作业然后加入它们,但这意味着在繁重的处理加入步骤之前完成序列/文件树遍历,这似乎......将其拆分为收集和处理步骤意味着收集可以在处理之前运行。

val jobs = ... the Sequence above...
.toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }

最佳答案

这不是针对您的问题,但它确实回答了“如何限制 kotlin 协程最大并发性”的问题。

编辑:从 kotlinx.coroutines 1.6.0 ( https://github.com/Kotlin/kotlinx.coroutines/issues/2919 ) 开始,您可以使用 limitedParallelism,例如Dispatchers.IO.limitedParallelism(123).

旧解决方案:一开始我想使用 newFixedThreadPoolContext,但是 1) it's deprecated和 2) 它会使用线程,我认为这不是必要或不可取的(与 Executors.newFixedThreadPool().asCoroutineDispatcher() 相同)。此解决方案可能存在使用 Semaphore 时我不知道的缺陷,但很简单:

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

/**
* Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
*/
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
maxConcurrency: Int,
transform: suspend (TInput) -> TOutput,
) = coroutineScope {
val gate = Semaphore(maxConcurrency)
this@mapConcurrently.map {
async {
gate.withPermit {
transform(it)
}
}
}.awaitAll()
}

测试(抱歉,它使用 Spek、hamcrest 和 kotlin 测试):

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
val testDispatcher = TestCoroutineDispatcher()

afterEachTest {
// Clean up the TestCoroutineDispatcher to make sure no other work is running.
testDispatcher.cleanupTestCoroutines()
}

describe("mapConcurrently") {
it("should run all inputs concurrently if maxConcurrency >= size") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = inputs.size

// https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
runBlocking(testDispatcher) {
print("start runBlocking $coroutineContext\n")

// We have to run this async so that the code afterwards can advance the virtual clock
val job = launch {
testDispatcher.pauseDispatcher {
val result = inputs.mapConcurrently(maxConcurrency) {
print("action $it $coroutineContext\n")

// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))

// Allow for virtual clock adjustment
delay(actionDelay)

// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
print("action $it after delay $coroutineContext\n")

it
}

// Order is not guaranteed, thus a Set
assertEquals(inputs.toSet(), result.toSet())
print("end mapConcurrently $coroutineContext\n")
}
}
print("before advanceTime $coroutineContext\n")

// Start the coroutines
testDispatcher.advanceTimeBy(0)
assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")

testDispatcher.advanceTimeBy(actionDelay)
print("after advanceTime $coroutineContext\n")
assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
job.join()
}
}

it("should run one at a time if maxConcurrency = 1") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1

runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
delay(actionDelay)
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
it
}
}
}

testDispatcher.advanceTimeBy(0)
assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")

val elapsedTime = testDispatcher.advanceUntilIdle()
print("elapsedTime=$elapsedTime")
assertThat(
"Virtual time should be at least as long as if all jobs ran sequentially",
elapsedTime,
greaterThanOrEqualTo(actionDelay * inputs.size)
)
job.join()
}
}

it("should handle cancellation") {
val jobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1

runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
jobCounter.addAndGet(1)
delay(actionDelay)
it
}
}
}

testDispatcher.advanceTimeBy(0)
assertEquals(1, jobCounter.get(), "Only one job should have started")

job.cancel()
testDispatcher.advanceUntilIdle()
assertEquals(1, jobCounter.get(), "Only one job should have run")
job.join()
}
}
}
})

根据 https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing ,您可能还需要调整编译器参数以运行测试:

compileTestKotlin {
kotlinOptions {
// Needed for runBlocking test coroutine dispatcher?
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
}
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'

关于multithreading - 如何限制 kotlin 协程的最大并发性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686353/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com