Golang:如何捕获大规模并行基准(> 100 万个任务)的 return 值?

Golang: How to capture return values of massively parallel benchmark (> 1 million tasks)?

我正在构建一个主要生成配置的参数优化器, 对所有这些进行基准测试,收集所有结果,对它们进行排序,然后选择相对于基准测试结果性能最佳的配置。

基准测试本身运行良好,但每 运行 需要 50 毫秒到 2 秒,具体取决于配置。关键是,优化器会生成非常大量的配置,这意味着在最低端的 100k 和高端的大约 4000 万之间,大约 1 - 500 万是一个很好的正常范围。显然,单线程版本需要很长时间,CPU负载实际上很低,因为任务相对较轻。

我已经以一种使其能够很好地处理并发的方式设计了基准测试,也就是说,运行ner 被封装在一个单独的结构(称为代理)中,并且基准测试本质上是一个纯函数,它将所有状态作为参数。本质上,每个 运行 创建自己的状态,然后 运行 独立于所有其他状态,但所有功能都使用相同的(引用的)共享数据集。函数如下图

但是,我很难处理每个基准的 return 值。过去,在 Scale 中,我们使用 Async / Await 来实现任务并行性,并让结果继续进行。 Go Routines,afaik 只适用于没有 return 值的函数。在实践中,通道是从 goroutine 获取值的最自然的方式。这就是我正在考虑的症结所在:

考虑到我通常有 > 100 万个任务,我如何正确有效地捕获 return 值?

与此相关,Golang 是否真的有一个非常快速的参数优化器? 对于 python,我记得 optuna 提供了出色的结果。

谢谢

func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {

scores := make([]bmx.BackTestResult, len(strategyConfigs))

println("Run all benchmarks")

for i, config := range strategyConfigs {
    state := newState(&config)
    score := a.runBenchmark(state)
    scores[i] = *score // sort only works on actual values
}


println("Sort results")
sort.Sort(bmx.ByTotalPnL(scores))

println("Select best config")
best := scores[len(scores)-1]

println("Generate best strategy config")
stratConf := a.getStrategyConfig(best.PatternConfig)

println("Return optimization results ")
result = &bmx.OptimizeResult{
    Symbol:          best.Symbol,
    StrategyType:    best.StrategyType,
    OptimizedConfig: &stratConf,
    ...

}
    return result
 }
 

有多种方法可以做到这一点。

一本“教科书”是这样的:

results := make(chan *score)

for i, config := range strategyConfigs {
    state := newState(&config)
    go a.runBenchmark(state, results)
}

for i := 0; i < len(strategyConfigs); i++ {
  scores[i] = <-results
}

...然后修改您的 runBenchmark 方法以不 return 任何值 并接受 chan *score.

类型的第二个参数

片段是这样滚动的:

  1. 创建一个通道来交换 *score 类型的值。
  2. 启动尽可能多的 goroutines 运行ning runBenchmark 方法——我想——“一个代理”。
    该方法发送(指向)一个 score 它通过提交给它的通道计算的对象并退出。
  3. 另一个循环执行与生成的 goroutine 一样多的来自通道的接收,并将每个接收到的值放入生成的切片中。

注意事项:

  • 这假定 a 可以同时使用多个 goroutines 运行ning 执行它的 runBenchmark

    如果不行,您可能需要为每个单独的 goroutine 创建一个单独的 a 到 运行。
    鉴于您的示例不是太小,我很难对 hard/simple 的情况做出有根据的猜测。
    如果您需要这方面的帮助,请提出一个单独的狭义问题。

  • 如果你将拥有数亿个“策略配置”,这种方法将过于简单,因为所有 goroutine 将同时生成,a) 是一种资源浪费; b) 如果数字太大,甚至可能会失败。
    教科书式的修复方法是使用所谓的“扇出”——当您有一个 goroutine 通过通道接收“任务”并将它们分发到有限数量的 worker goroutines 时,这些 goroutines 始终保持在一定限制以下。你可以开始 here.

另一种方法是利用这样一个事实,即在 Go 中,数组的每个元素(以及切片 — 扩展)都被视为一个单独的变量。 这意味着可以同时更新来自 worker goroutines 的结果切片的各个元素——只要切片是预先分配的并且在这个过程正在进行时永远不会重新分配(使用 append、resliced 等操作)。

为了演示,让我们使用 "wait groups":

import "sync"

var wg sync.WaitGroup

scores := make([]*score, len(strategyConfigs))

wg.Add(len(strategyConfigs))
for i, config := range strategyConfigs {
    state := newState(&config)
    go a.runBenchmark(state, &scores[i], &wg)
}

wg.Wait()

runBenchmark方法应该修改为

defer wg.Done()

作为它的第一个声明并接受两个附加参数—— 类型 *score*sync.WaitGroup.

在这里,runBenchmark 在一个单独的 goroutine 中开始到 运行,并传递一个元素的地址以更新其结果和一个等待组的地址以发出“任务完成”信号上。

请注意,与第一种情况基本相同的注意事项适用。

如您所见,goroutine 确实 return 没有任何值。 这主要是因为到它可以的时候,产生它的 goroutine 可能早已不复存在,并且 return 那个值无处可去。

因此基本上有两种方法可以从 goroutine 中“取出数据”:

  • 在通道上发送该值(并让其他 goroutine 从该通道接收)。

    这是围棋的基础。 我会建议从这种方法开始并使用它,直到您完全适应它为止。

  • 更新内存中的某个位置,前提是没有其他 goroutine 做同样的事情(否则你会有数据竞争)。

    在某些情况下,这种方法可能更快(对某些人来说甚至更简单),但此类代码背后的原因可能更难理解。

您可以从 this an this 开始了解基本概念。


总而言之,有几点建议。