在并行问题上,并发代码比顺序代码慢?

Concurrent code slower than sequential code on parallel problem?

我写了一些代码来执行 Monte Carlo simulations。我写的第一件事是这个顺序版本:

 func simulationSequential(experiment func() bool, numTrials int) float64 {
    ocurrencesEvent := 0

    for trial := 0; trial < numTrials; trial++ {
        eventHappend := experiment()
        if eventHappend {
            ocurrencesEvent++
        }
    }

    return float64(ocurrencesEvent) / float64(numTrials)
}

然后,我想我可以 运行 同时进行一些实验,并使用笔记本电脑的多核更快地获得结果。所以,我写了以下版本:

func simulationConcurrent(experiment func() bool, numTrials, nGoroutines int) float64 {
    ch := make(chan int)
    var wg sync.WaitGroup

    // Launch work in multiple goroutines
    for i := 0; i < nGoroutines; i++ {
        wg.Add(1)
        go func() {
            localOcurrences := 0
            for j := 0; j < numTrials/nGoroutines; j++ {
                eventHappend := experiment()
                if eventHappend {
                    localOcurrences++
                }
            }
            ch <- localOcurrences
            wg.Done()
        }()
    }

    // Close the channel when all the goroutines are done
    go func() {
        wg.Wait()
        close(ch)
    }()

    // Acummulate the results of each experiment
    ocurrencesEvent := 0
    for localOcurrences := range ch {
        ocurrencesEvent += localOcurrences
    }

    return float64(ocurrencesEvent) / float64(numTrials)
}

令我惊讶的是,当我 运行 对两个版本进行基准测试时,我发现顺序版本比并发版本快,并发版本随着我减少数量而变得更好的协程。为什么会这样?我认为并发版本会更快,因为这是一个高度可并行化的问题。

这是我的基准代码:

func tossEqualToSix() bool {
    // Simulate the toss of a six-sided die
    roll := rand.Intn(6) + 1

    if roll != 6 {
        return false
    }
    return true
}

const (
    numsSimBenchmark       = 1_000_000
    numGoroutinesBenckmark = 10
)

func BenchmarkSimulationSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        simulationSequential(tossEqualToSix, numsSimBenchmark)
    }
}

func BenchmarkSimulationConcurrent(b *testing.B) {
    for i := 0; i < b.N; i++ {
        simulationConcurrent(tossEqualToSix, numsSimBenchmark, numGoroutinesBenckmark)
    }
}

结果:

goos: linux
goarch: amd64
pkg: github.com/jpuriol/montecarlo
cpu: Intel(R) Core(TM) i7-10510U CPU @ 1.80GHz
BenchmarkSimulationSequential-8               36          30453588 ns/op
BenchmarkSimulationConcurrent-8                9         117462720 ns/op
PASS
ok      github.com/jpuriol/montecarlo   2.478s

您可以从 Github 下载我的代码。

我想我会详细说明我的评论并 post 它带有代码和基准测试结果。

Examine 函数使用 rand 包中的包级 rand 函数。引擎盖下的这些函数使用 rand.RandglobalRand 实例。例如 func Intn(n int) int { return globalRand.Intn(n) }。由于随机数生成器不是线程安全的,因此 globalRand 按以下方式实例化:

/*
 * Top-level convenience functions
 */

var globalRand = New(&lockedSource{src: NewSource(1).(*rngSource)})

type lockedSource struct {
    lk  sync.Mutex
    src *rngSource
}

func (r *lockedSource) Int63() (n int64) {
    r.lk.Lock()
    n = r.src.Int63()
    r.lk.Unlock()
    return
}
...

这意味着 rand.Intn 的所有调用都受到全局锁的保护。结果是检查函数“按顺序工作”,因为有锁。更具体地说,每次调用 rand.Intn 都不会在上一次调用完成之前开始生成随机数。

这里是重新设计的代码。每个检查函数都有自己的随机生成器。假设单个examine function被一个goroutine使用,所以不需要锁保护。

package main

import (
    "math/rand"
    "sync"
    "testing"
    "time"
)

func simulationSequential(experimentFuncFactory func() func() bool, numTrials int) float64 {
    experiment := experimentFuncFactory()
    ocurrencesEvent := 0

    for trial := 0; trial < numTrials; trial++ {
        eventHappend := experiment()
        if eventHappend {
            ocurrencesEvent++
        }
    }

    return float64(ocurrencesEvent) / float64(numTrials)
}

func simulationConcurrent(experimentFuncFactory func() func() bool, numTrials, nGoroutines int) float64 {
    ch := make(chan int)
    var wg sync.WaitGroup

    // Launch work in multiple goroutines
    for i := 0; i < nGoroutines; i++ {
        wg.Add(1)
        go func() {
            experiment := experimentFuncFactory()
            localOcurrences := 0
            for j := 0; j < numTrials/nGoroutines; j++ {
                eventHappend := experiment()
                if eventHappend {
                    localOcurrences++
                }
            }
            ch <- localOcurrences
            wg.Done()
        }()
    }

    // Close the channel when all the goroutines are done
    go func() {
        wg.Wait()
        close(ch)
    }()

    // Acummulate the results of each experiment
    ocurrencesEvent := 0
    for localOcurrences := range ch {
        ocurrencesEvent += localOcurrences
    }

    return float64(ocurrencesEvent) / float64(numTrials)
}

func tossEqualToSix() func() bool {
    prng := rand.New(rand.NewSource(time.Now().UnixNano()))
    return func() bool {
        // Simulate the toss of a six-sided die
        roll := prng.Intn(6) + 1

        if roll != 6 {
            return false
        }
        return true
    }
}

const (
    numsSimBenchmark       = 5_000_000
    numGoroutinesBenchmark = 10
)

func BenchmarkSimulationSequential(b *testing.B) {
    for i := 0; i < b.N; i++ {
        simulationSequential(tossEqualToSix, numsSimBenchmark)
    }
}

func BenchmarkSimulationConcurrent(b *testing.B) {
    for i := 0; i < b.N; i++ {
        simulationConcurrent(tossEqualToSix, numsSimBenchmark, numGoroutinesBenchmark)
    }
}

基准测试结果如下:

goos: darwin
goarch: arm64
pkg: scratchpad
BenchmarkSimulationSequential-8           20      55142896 ns/op
BenchmarkSimulationConcurrent-8           82      12944360 ns/op