gpt4 book ai didi

multithreading - Golang : How to capture return values of massively parallel benchmark (> 1 million tasks)?

转载 作者:行者123 更新时间:2023-12-04 13:07:29 25 4
gpt4 key购买 nike

我正在构建一个基本上生成配置的参数优化器,
对所有这些进行基准测试,收集所有结果,对它们进行排序,然后选择相对于基准测试结果的最佳性能配置。
基准测试本身运行良好,但每次运行需要 50 毫秒/2 秒,具体取决于配置。关键是,优化器生成了非常多的配置,这意味着最低端的 100k 和高端的大约 4000 万之间,大约 1-500 万是一个很好的正常范围。显然,单线程版本需要永远,CPU 负载实际上非常低,因为任务相对较轻。
我已经以某种方式设计了基准测试,使其能够很好地与并发一起工作,也就是说,运行程序被封装在一个单独的结构(称为代理)中,而基准测试本质上是一个将所有状态作为参数的纯函数。本质上,每次运行都会创建自己的状态,然后独立于所有其他运行,但所有函数都使用相同(引用)共享数据集。功能如下图。
但是,我很难处理每个 Benchmark 的返回值。过去,在 Scale 中,我们使用 Async/Await 进行任务并行,然后让结果继续运行。 Go Routines,afaik 只适用于没有返回值的函数。在实践中, channel 是从 goroutine 中获取值的最自然的方式。这就是我正在考虑的症结所在:
考虑到我通常有 > 100 万个任务,如何正确有效地捕获返回值?
与此相关的是,Golang 是否真的有一个非常快速的参数优化器?
对于 python,我记得 optuna 提供了出色的结果。
谢谢

func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {

scores := make([]bmx.BackTestResult, len(strategyConfigs))

println("Run all benchmarks")

for i, config := range strategyConfigs {
state := newState(&config)
score := a.runBenchmark(state)
scores[i] = *score // sort only works on actual values
}


println("Sort results")
sort.Sort(bmx.ByTotalPnL(scores))

println("Select best config")
best := scores[len(scores)-1]

println("Generate best strategy config")
stratConf := a.getStrategyConfig(best.PatternConfig)

println("Return optimization results ")
result = &bmx.OptimizeResult{
Symbol: best.Symbol,
StrategyType: best.StrategyType,
OptimizedConfig: &stratConf,
...

}
return result
}

最佳答案

有多种方法可以做到这一点。
一本“教科书”是这样的:

results := make(chan *score)

for i, config := range strategyConfigs {
state := newState(&config)
go a.runBenchmark(state, results)
}

for i := 0; i < len(strategyConfigs); i++ {
scores[i] = <-results
}
...然后修改您的 runBenchmark不返回任何值的方法
并接受 chan *score 类型的第二个参数.
片段滚动如下:
  • 创建一个 channel 来交换类型 *score 的值.
  • 启动尽可能多的 goroutines 运行 runBenchmark方法——我想——“代理人”。
    该方法发送(指向)一个 score它通过提交给它的 channel 计算的对象并退出。
  • 另一个循环执行与生成的 goroutine 一样多的来自 channel 的接收,并将每个接收到的值放入结果 slice 中。

  • 注意事项:
  • 这预设了 a执行它的 runBenchmark 没问题多个 goroutine 同时运行。
    如果不行,您应该需要创建一个单独的 a运行每个单独的 goroutine。
    鉴于你的例子不是太小,我很难对它有多难/有多简单做出有根据的猜测。
    如果您需要这方面的帮助,请提出一个单独的狭义问题。
  • 如果您将拥有数亿个“策略配置”,那么这种方法就太简单了,因为所有 goroutine 将同时生成,a) 浪费资源; b) 如果数量太大,甚至可能会失败。
    教科书上的解决方法是使用所谓的“扇出”——当你有一个 goroutine 通过一个 channel 接收“任务”并将它们分配到有限数量的工作 goroutines 上时,该 goroutine 始终保持在一定的限制以下。您可以开始 here .

  • 另一种方法是利用这样一个事实,即在 Go 中,数组的每个元素(和 slice ——通过扩展)被视为一个单独的变量。
    这意味着可以同时更新工作程序 goroutine 生成的 slice 的各个元素——只要 slice 是预先分配的,并且在此过程进行时从未重新分配(使用 append 、重新 slice 等操作)。
    为了演示,让我们使用 "wait groups" :
    import "sync"

    var wg sync.WaitGroup

    scores := make([]*score, len(strategyConfigs))

    wg.Add(len(strategyConfigs))
    for i, config := range strategyConfigs {
    state := newState(&config)
    go a.runBenchmark(state, &scores[i], &wg)
    }

    wg.Wait()
    runBenchmark方法应该修改为
    defer wg.Done()
    作为它的第一个语句并接受两个额外的参数——
    类型 *score*sync.WaitGroup .
    在这里, runBenchmark开始在单独的 goroutine 中运行,并传递元素的地址以更新其结果和 WaitGroup 的地址以发出“任务完成”的信号。
    请注意,与第一种情况基本相同的警告适用。
    如您所见,goroutine 确实不返回任何值。
    这主要是因为到时候,产生它的 goroutine 可能早已不复存在,并且无处可返回该值。
    因此,基本上有两种方法可以从 goroutine 中“获取数据”:
  • 在 channel 上发送该值(并让其他 goroutine 从该 channel 接收)。
    这是 Go 的面包和黄油。
    我建议从这种方法开始并使用它,直到您对它感到完全满意为止。
  • 更新内存中的某个地方,提供没有其他 goroutine 做同样的事情(否则你会有数据竞争)。
    在某些情况下,这种方法可能更快(对某些人来说甚至更简单),但此类代码背后的推理可能更难看出。

  • 您可以从 this 开始 this了解基本概念。

    总而言之,有几个指针。
  • 我建议在掌握其基础知识之前,不要开始编写涉及并发的几乎不严肃的代码。
    请从 the relevant part of The Tour of Go 开始然后转到 Go 博客:
  • https://blog.golang.org/pipelines
  • https://blog.golang.org/io2013-talk-concurrency
  • https://blog.golang.org/concurrency-timeouts

  • 首先尝试使用更简单的示例。
  • 关于multithreading - Golang : How to capture return values of massively parallel benchmark (> 1 million tasks)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68721598/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com