Goroutines 阻塞连接池

Goroutines blocked connection pool

package main

import (
    "database/sql"
    "fmt"
    _ "github.com/lib/pq"
    "sync"
)

func main() {
    db, _ := sql.Open("postgres", fmt.Sprintf("host=%s dbname=%s user=%s sslmode=disable", "localhost", "dbname", "postgres"))
    defer db.Close()

    db.SetMaxOpenConns(15)
    var wg sync.WaitGroup
    for i := 0; i < 15; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            //#1
            rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
            for rows.Next() {
                //#2
                db.Exec("SELECT * FROM reviews LIMIT 1")
            }
        }()
    }

    wg.Wait()
}

查询#1 打开了 15 个连接,它们将在执行 rows.Next() 时关闭。但是 rows.Next() 将永远不会被执行,因为它包含等待空闲连接的 db.Exec()

如何解决这个问题?

你拥有的是 deadlock。在最坏的情况下,您有 15 个 goroutines 持有 15 个数据库连接,并且所有这 15 个 goroutines 都需要一个新的连接才能继续。但是要获得一个新的连接,就必须提前释放一个连接:死锁。

链接的维基百科文章详细介绍了死锁的预防。例如,当代码执行拥有所需(或将需要)的所有资源时,它应该只进入关键部分(锁定资源)。在这种情况下,这意味着您必须保留 2 个连接(恰好 2 个;如果只有 1 个可用,请留下并等待),如果您有这 2 个,则仅继续进行查询。但是在 Go 中你不能提前保留连接。它们是在您执行查询时根据需要分配的。

通常应该避免这种模式。您不应该编写首先保留(有限)资源(在这种情况下为数据库连接)的代码,并且在释放它之前,它需要另一个资源。

一个简单的解决方法是执行第一个查询,保存其结果(例如保存到 Go 切片中),完成后,继续进行后续查询(但也不要忘记关闭sql.Rows 第一)。这样您的代码就不需要同时有 2 个连接。

别忘了处理错误!为了简洁起见,我省略了它们,但你不应该在你的代码中。

这是它的样子:

go func() {
    defer wg.Done()

    rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
    var data []int // Use whatever type describes data you query
    for rows.Next() {
        var something int
        rows.Scan(&something)
        data = append(data, something)
    }
    rows.Close()

    for _, v := range data {
        // You may use v as a query parameter if needed
        db.Exec("SELECT * FROM reviews LIMIT 1")
    }
}()

请注意,rows.Close() 应作为 defer 语句执行,以确保它会被执行(即使发生恐慌)。但是如果你只是简单地使用defer rows.Close(),那只会在后面的查询执行完之后才执行,所以它不会阻止死锁。所以我会重构它以在另一个函数(可能是匿名函数)中调用它,您可以在其中使用 defer:

    rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
    var data []int // Use whatever type describes data you query
    func() {
        defer rows.Close()
        for rows.Next() {
            var something int
            rows.Scan(&something)
            data = append(data, something)
        }
    }()

另请注意,在第二个 for 循环中准备好的语句(sql.Stmt) acquired by DB.Prepare() 可能是多次执行相同(参数化)查询的更好选择。

另一种选择是在新的 goroutines 中启动后续查询,以便在释放当前锁定的连接(或任何其他 goroutine 锁定的任何其他连接)时执行查询,但是没有显式同步你不在他们被处决时有控制权。它可能看起来像这样:

go func() {
    defer wg.Done()

    rows, _ := db.Query("SELECT * FROM reviews LIMIT 1")
    defer rows.Close()
    for rows.Next() {
        var something int
        rows.Scan(&something)
        // Pass something if needed
        go db.Exec("SELECT * FROM reviews LIMIT 1")
    }
}()

要让您的程序也等待这些 goroutine,请使用您已经在使用的 WaitGroup

        // Pass something if needed
        wg.Add(1)
        go func() {
            defer wg.Done()
            db.Exec("SELECT * FROM reviews LIMIT 1")
        }()