- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我有一个序列(来自 File.walkTopDown),我需要在每个序列上运行一个长时间运行的操作。我想使用 Kotlin 最佳实践/协程,但我要么没有并行性,要么并行性太多,并遇到“打开文件太多”的 IO 错误。
File("/Users/me/Pictures/").walkTopDown()
.onFail { file, ex -> println("ERROR: $file caused $ex") }
.filter { ... only big images... }
.map { file ->
async { // I *think* I want async and not "launch"...
ImageProcessor.fromFile(file)
}
}
这似乎不是并行运行的,而且我的多核 CPU 永远不会超过 1 个 CPU 的值(value)。有没有办法使用协程来运行“NumberOfCores 并行操作”的延迟作业?
我看了Multithreading using Kotlin Coroutines首先创建所有作业然后加入它们,但这意味着在繁重的处理加入步骤之前完成序列/文件树遍历,这似乎......将其拆分为收集和处理步骤意味着收集可以在处理之前运行。
val jobs = ... the Sequence above...
.toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }
最佳答案
这不是针对您的问题,但它确实回答了“如何限制 kotlin 协程最大并发性”的问题。
编辑:从 kotlinx.coroutines 1.6.0 ( https://github.com/Kotlin/kotlinx.coroutines/issues/2919 ) 开始,您可以使用 limitedParallelism
,例如Dispatchers.IO.limitedParallelism(123)
.
旧解决方案:一开始我想使用 newFixedThreadPoolContext
,但是 1) it's deprecated和 2) 它会使用线程,我认为这不是必要或不可取的(与 Executors.newFixedThreadPool().asCoroutineDispatcher()
相同)。此解决方案可能存在使用 Semaphore 时我不知道的缺陷,但很简单:
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
/**
* Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
*/
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
maxConcurrency: Int,
transform: suspend (TInput) -> TOutput,
) = coroutineScope {
val gate = Semaphore(maxConcurrency)
this@mapConcurrently.map {
async {
gate.withPermit {
transform(it)
}
}
}.awaitAll()
}
测试(抱歉,它使用 Spek、hamcrest 和 kotlin 测试):
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
val testDispatcher = TestCoroutineDispatcher()
afterEachTest {
// Clean up the TestCoroutineDispatcher to make sure no other work is running.
testDispatcher.cleanupTestCoroutines()
}
describe("mapConcurrently") {
it("should run all inputs concurrently if maxConcurrency >= size") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = inputs.size
// https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
runBlocking(testDispatcher) {
print("start runBlocking $coroutineContext\n")
// We have to run this async so that the code afterwards can advance the virtual clock
val job = launch {
testDispatcher.pauseDispatcher {
val result = inputs.mapConcurrently(maxConcurrency) {
print("action $it $coroutineContext\n")
// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
// Allow for virtual clock adjustment
delay(actionDelay)
// Sanity check that we never run more in parallel than max
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
print("action $it after delay $coroutineContext\n")
it
}
// Order is not guaranteed, thus a Set
assertEquals(inputs.toSet(), result.toSet())
print("end mapConcurrently $coroutineContext\n")
}
}
print("before advanceTime $coroutineContext\n")
// Start the coroutines
testDispatcher.advanceTimeBy(0)
assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")
testDispatcher.advanceTimeBy(actionDelay)
print("after advanceTime $coroutineContext\n")
assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
job.join()
}
}
it("should run one at a time if maxConcurrency = 1") {
val concurrentJobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1
runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
delay(actionDelay)
assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
it
}
}
}
testDispatcher.advanceTimeBy(0)
assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")
val elapsedTime = testDispatcher.advanceUntilIdle()
print("elapsedTime=$elapsedTime")
assertThat(
"Virtual time should be at least as long as if all jobs ran sequentially",
elapsedTime,
greaterThanOrEqualTo(actionDelay * inputs.size)
)
job.join()
}
}
it("should handle cancellation") {
val jobCounter = AtomicInteger(0)
val inputs = IntRange(1, 2).toList()
val maxConcurrency = 1
runBlocking(testDispatcher) {
val job = launch {
testDispatcher.pauseDispatcher {
inputs.mapConcurrently(maxConcurrency) {
jobCounter.addAndGet(1)
delay(actionDelay)
it
}
}
}
testDispatcher.advanceTimeBy(0)
assertEquals(1, jobCounter.get(), "Only one job should have started")
job.cancel()
testDispatcher.advanceUntilIdle()
assertEquals(1, jobCounter.get(), "Only one job should have run")
job.join()
}
}
}
})
根据 https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing ,您可能还需要调整编译器参数以运行测试:
compileTestKotlin {
kotlinOptions {
// Needed for runBlocking test coroutine dispatcher?
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
}
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
关于multithreading - 如何限制 kotlin 协程的最大并发性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47686353/
我有这个代码: private void doSomething() throws InterruptedException { WorkerThread w= new WorkerThrea
我有一个关于并发的简单问题。我正在通过可运行接口(interface)实现线程和并发。如果我首先初始化线程,然后在初始化后单独调用 start,或者如果我初始化线程并从同一个 for 循环中调用 st
我刚开始接触并发,所以如果我问一些明显/愚蠢的问题,请多多包涵。我正在尝试采取第一步来改造我必须利用 Java 货币的模型。没有详细说明,我有一部分模型加载了一些文件,然后在给定请求时它返回文件上的一
我有一个 java 类,它同时被很多线程访问,我想确保它是线程安全的。该类有一个私有(private)字段,它是字符串到字符串列表的映射。我已将 Map 实现为 ConcurrentHashMap 以
正如我们所知,ThreadPoolExecutor 使用一些 BlockingQueue 作为传入任务的队列。我想要的是让 ThreadPoolExecutor 有一个 second 队列,用于准备就
若两个操作同时发生,则称为并发,但事实上,操作是否在时间上重叠并不重要。由于分布式系统复杂的时钟同步问题,现实中很难严格判断两个事件是否同时发生。 为更好定义并发性,并不依赖确切发生时间,即若两个操作
这是计算任意数的阶乘的代码: unsigned long long factorial(int n) { Concurrency::combinable products=Concurrency:
我找不到使用最新的 JAVA 并发例程的这种特定情况的示例。 我计划使用threads 来处理来自开放队列的项目,该队列可能包含 0 到数千个请求。我想限制在任何给定时间有不少于 0 且不超过 10
我正在迈出学习多线程的第一步,并构建一个小测试程序,以便为自己提供一些见解。由于重新排序的可能性,我不相信我的解决方案是安全的.. 这是主程序: public class SynchTest {
我目前正在从事一个搜索引擎项目。为了更快的爬行速度,我在每次链接访问时使用一个 goroutine。但是我遇到了两个让我疑惑的问题! 第一个是代码示例: package main import "fm
我一直在使用 Azure 存储队列通过 QueueTrigger 属性来提供 WebJob。我将 QueueTrigger 配置为将多个项目出队以进行并发处理,如下所示: public static
我正在使用Rails的一些中间件,使用的是最新版本: pfernand-2-mn:~ pfernand$ rails -v Rails 3.1.2 这是rake middleware的输出: use
我一直在尝试使用 Azure Data Lake Store,并且在文档中 Microsoft 声称该系统针对低延迟小文件写入进行了优化。测试它我尝试对单个文件执行大量并行任务写入,但此方法在大多数情
假设我有一个同步的 HashMap,它有一个字符串作为键和一个列表作为值。 Map> map = Collections.synchronizedMap(new HashMap>()); 这个列表是线
这个问题在这里已经有了答案: ConcurrentModificationException for ArrayList [duplicate] (6 个答案) 关闭 9 年前。 我有一个关于我的
问题 当且仅当有空闲 CPU 时,我如何扩展以使用更多线程?像 ThreadPoolExecutor 这样的东西,它在 cpu 核心空闲时使用更多线程,如果没有空闲则更少或只使用一个线程。 用例 现状
这个问题已经有答案了: 已关闭13 年前。 Possible Duplicate: Thread safety in Java class 我正在阅读实践中的 Java 并发,并且我遇到了一个令我困惑
Stroustrup 在C++ 第 4 版第 1193 页中给出了以下示例。我的问题是使用两个线程的并发程序,一个用于 f(),另一个用于 g() 以及 Stroustrup 的声明: if a li
我是一名优秀的程序员,十分优秀!