Go Channels:如何实现非阻塞?

Go Channels: How to make this non-blocking?

我有一个脚本,它从数据库中选择一些数据并将其发送到一个通道以供多个 goroutine 处理,然后将结果发送回主线程以在数据库上更新。

但是,它在将数据发送到第一个通道时挂起(可能阻塞)。

频道是通过以下方式全局创建的:

var chin = make(chan in)
var chout = make(chan out)

inout 都是 structs

首先启动 goroutines:

for i:=0; i<5; i++ {
     go worker()
}

加载频道的代码是:

        if verbose {
            fmt.Println(`Getting nextbatch2 and sending to workers`)
        }

        rows, err = nextbatch2.Query()
        if err != nil {
            panic(err)
        }

        var numtodo int
        for rows.Next() {
            err = rows.Scan(&id, &data)
            if err != nil {
                rows.Close()
                panic(err)
            }

            // Start
            var vin in
            vin.id = id
            vin.data = data
            chin <- vin
            numtodo++
        }
        rows.Close()

紧接着:

        if verbose {
            fmt.Println(`Processing out channel from workers`)
        }

        for res := range chout {
            update5.Exec(res.data, res.id)
            if numtodo--; numtodo == 0 {
                break
            }
        }

后台有多个 worker() goroutines 运行:

func worker() {
      for res := range chin {
            var v out
            v.id = res.id
            v.data = process(res.data)
            chout <- v
      }
}

此代码在打印 Getting nextbatch2 and sending to workers 后挂起。它永远不会达到 Processing out channel from workers。所以它挂在 rows.Next() 循环内的某个地方,我无法弄清楚原因,因为 chin 通道应该是非阻塞的——即使 worker() goroutines 没有处理它应该至少完成那个循环。

有什么想法吗?

编辑:

通过在 rows.Next() 循环末尾添加 fmt.Println(" on", numtodo) 我可以看到它在 5 之后阻塞,我不明白它应该是非阻塞的,对吧?

编辑 2:

通过将频道更改为 make(chan in/out, 100),它现在将在 105 之后阻塞。

Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value.

https://golang.org/doc/effective_go.html#channels

因此您可以将您的消费者代码重写为:

go func(){
    for res := range chout {
        update5.Exec(res.data, res.id)
    }
}()

您还需要 close(chin)close(chout) 才能正确使用 range 语句。