goroutine 中的数据库调用失败而没有错误

DB calls in goroutine failing without error

我编写了一个脚本来将大量数据从一个数据库迁移到另一个数据库并且运行良好,但现在我想尝试使用 goroutines 通过并发数据库调用来加速脚本。由于更改为调用 go processBatch(offset) 而不是仅调用 processBatch(offset),我可以看到启动了一些 goroutine,但脚本几乎立即完成,实际上什么也没做。每次我调用脚本时,启动的 goroutines 的数量也会有所不同。没有错误(我可以看到)。

总的来说,我对 goroutines 和 Go 还是陌生的,所以非常感谢任何关于我可能做错了什么的指示。我已经从下面的代码中删除了与并发或数据库访问无关的所有逻辑,因为它 运行 在没有更改的情况下很好。我还在我认为失败的地方留下了评论,因为该行下方没有任何内容 运行 (打印不输出)。我还尝试使用 sync.WaitGroup 来错开数据库调用,但它似乎没有任何改变。

var (
    legacyDB     *sql.DB
    v2DB         *sql.DB
)

func main() {

    var total, loops int
    var err error

    legacyDB, err = sql.Open("mysql", "...")
    if err != nil {
        panic(err)
    }
    defer legacyDB.Close()

    v2DB, err = sql.Open("mysql", "...")
    if err != nil {
        panic(err)
    }
    defer v2DB.Close()

    err = legacyDB.QueryRow("SELECT count(*) FROM users").Scan(&total)
    checkErr(err)

    loops = int(math.Ceil(float64(total) / float64(batchsize)))

    fmt.Println("Total: " + strconv.Itoa(total))
    fmt.Println("Loops: " + strconv.Itoa(loops))

    for i := 0; i < loops; i++ {
        offset := i * batchsize

        go processBatch(offset)
    }

    legacyDB.Close()
    v2DB.Close()
}

func processBatch(offset int) {

    query := namedParameterQuery.NewNamedParameterQuery(`
        SELECT ...
        LIMIT :offset,:batchsize
    `)
    query.SetValue(...)

    rows, err := legacyDB.Query(query.GetParsedQuery(), (query.GetParsedParameters())...)
    // nothing after this line gets done (Println here does not show output)
    checkErr(err)
    defer rows.Close()

    ....

    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    log.Printf("\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\n\n", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC)
}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}

正如 Nadh 在评论中提到的,那是因为程序在 main 函数完成时退出,无论是否还有其他 goroutines 运行。要解决此问题,*sync.WaitGroup 就足够了。 WaitGroup 用于您有多个并发操作的情况,并且您希望等到它们全部完成。可以在此处找到文档:https://golang.org/pkg/sync/#WaitGroup.

不使用全局变量的程序示例实现类似于替换

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))

for i := 0; i < loops; i++ {
    offset := i * batchsize

    go processBatch(offset)
}

fmt.Println("Total: " + strconv.Itoa(total))
fmt.Println("Loops: " + strconv.Itoa(loops))

wg := new(sync.WaitGroup)
wg.Add(loops)

for i := 0; i < loops; i++ {
    offset := i * batchsize

    go func(offset int) {
        defer wg.Done()
        processBatch(offset)
    }(offset)
}

wg.Wait()