gpt4 book ai didi

kotlin 协程 - 在运行阻塞中使用主线程

转载 作者:行者123 更新时间:2023-12-04 15:29:51 24 4
gpt4 key购买 nike

我正在尝试执行以下代码:

 val jobs = listOf(...)
return runBlocking(CommonPool) {
val executed = jobs.map {
async { it.execute() }
}.toTypedArray()
awaitAll(*executed)
}

哪里 jobs是一些 Supplier的列表s - 在同步世界中,这应该只是创建,例如,整数列表。
一切正常,但问题是主线程没有被利用。来自 YourKit 的以下屏幕截图:
enter image description here

所以,问题是 - 我怎样才能利用主线程?

我想 runBlocking问题出在这里,但是还有其他方法可以收到相同的结果吗?使用 Java 并行流看起来好多了,但是主线程仍然没有被完全利用(任务是完全独立的)。

更新

好吧,也许我告诉你的事情太少了。
我的问题是在观看 Vankant Subramaniam 演示后一段时间提出的: https://youtu.be/0hQvWIdwnw4 .
我需要最高性能,没有 IO,没有 Ui 等。只有计算。只有请求,我需要使用我所有可用的资源。

我所拥有的一种想法是将 paralleizm 设置为线程数 + 1,但我认为这很愚蠢。

最佳答案

我使用 Java 8 并行流测试了该解决方案:

jobs.parallelStream().forEach { it.execute() }

我发现 CPU 利用率可靠地为 100%。作为引用,我使用了这个计算工作:
class MyJob {
fun execute(): Double {
val rnd = ThreadLocalRandom.current()
var d = 1.0
(1..rnd.nextInt(1_000_000)).forEach { _ ->
d *= 1 + rnd.nextDouble(0.0000001)
}
return d
}
}

请注意,其持续时间从零到执行 100,000,000 次 FP 乘法所需的时间随机变化。

出于好奇,我还研究了您添加到问题中的代码作为适合您的解决方案。我发现它有很多问题,例如:
  • 将所有结果累积到一个列表中,而不是在它们可用时进行处理
  • 提交最后一个作业后立即关闭结果 channel ,而不是等待所有结果

  • 我自己编写了一些代码,并添加了一些代码来对 Stream API 单线进行基准测试。这里是:
    const val NUM_JOBS = 1000
    val jobs = (0 until NUM_JOBS).map { MyJob() }


    fun parallelStream(): Double =
    jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

    fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
    GlobalScope.actor<MyJob>(Dispatchers.Default) {
    for (job in channel) {
    job.execute().also { resultChannel.send(it) }
    }
    }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
    jobs.forEach { job ->
    select {
    allComputeChannels.forEach { chan ->
    chan.onSend(job) {}
    }
    }
    }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
    var completedCount = 0
    var sum = 0.0
    while (completedCount < NUM_JOBS) {
    select<Unit> {
    mainComputeChannel.onReceive { job ->
    job.execute().also { resultChannel.send(it) }
    }
    resultChannel.onReceive { result ->
    sum += result
    completedCount++
    }
    }
    }
    sum
    }
    }

    fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    }

    fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
    }

    这是我的典型结果:
    Parallel Stream took 396.85 ms
    Channels took 398.1 ms

    结果是相似的,但一行代码仍然胜过 50 行代码:)

    关于kotlin 协程 - 在运行阻塞中使用主线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51450841/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com