Go Channels:如何实现非阻塞?
Go Channels: How to make this non-blocking?
我有一个脚本,它从数据库中选择一些数据并将其发送到一个通道以供多个 goroutine 处理,然后将结果发送回主线程以在数据库上更新。
但是,它在将数据发送到第一个通道时挂起(可能阻塞)。
频道是通过以下方式全局创建的:
var chin = make(chan in)
var chout = make(chan out)
in
和 out
都是 structs
首先启动 goroutines:
for i:=0; i<5; i++ {
go worker()
}
加载频道的代码是:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
紧接着:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
后台有多个 worker()
goroutines 运行:
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
此代码在打印 Getting nextbatch2 and sending to workers
后挂起。它永远不会达到 Processing out channel from workers
。所以它挂在 rows.Next()
循环内的某个地方,我无法弄清楚原因,因为 chin
通道应该是非阻塞的——即使 worker()
goroutines 没有处理它应该至少完成那个循环。
有什么想法吗?
编辑:
通过在 rows.Next()
循环末尾添加 fmt.Println(" on", numtodo)
我可以看到它在 5 之后阻塞,我不明白它应该是非阻塞的,对吧?
编辑 2:
通过将频道更改为 make(chan in/out, 100)
,它现在将在 105 之后阻塞。
Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value.
https://golang.org/doc/effective_go.html#channels
因此您可以将您的消费者代码重写为:
go func(){
for res := range chout {
update5.Exec(res.data, res.id)
}
}()
您还需要 close(chin)
和 close(chout)
才能正确使用 range
语句。
我有一个脚本,它从数据库中选择一些数据并将其发送到一个通道以供多个 goroutine 处理,然后将结果发送回主线程以在数据库上更新。
但是,它在将数据发送到第一个通道时挂起(可能阻塞)。
频道是通过以下方式全局创建的:
var chin = make(chan in)
var chout = make(chan out)
in
和 out
都是 structs
首先启动 goroutines:
for i:=0; i<5; i++ {
go worker()
}
加载频道的代码是:
if verbose {
fmt.Println(`Getting nextbatch2 and sending to workers`)
}
rows, err = nextbatch2.Query()
if err != nil {
panic(err)
}
var numtodo int
for rows.Next() {
err = rows.Scan(&id, &data)
if err != nil {
rows.Close()
panic(err)
}
// Start
var vin in
vin.id = id
vin.data = data
chin <- vin
numtodo++
}
rows.Close()
紧接着:
if verbose {
fmt.Println(`Processing out channel from workers`)
}
for res := range chout {
update5.Exec(res.data, res.id)
if numtodo--; numtodo == 0 {
break
}
}
后台有多个 worker()
goroutines 运行:
func worker() {
for res := range chin {
var v out
v.id = res.id
v.data = process(res.data)
chout <- v
}
}
此代码在打印 Getting nextbatch2 and sending to workers
后挂起。它永远不会达到 Processing out channel from workers
。所以它挂在 rows.Next()
循环内的某个地方,我无法弄清楚原因,因为 chin
通道应该是非阻塞的——即使 worker()
goroutines 没有处理它应该至少完成那个循环。
有什么想法吗?
编辑:
通过在 rows.Next()
循环末尾添加 fmt.Println(" on", numtodo)
我可以看到它在 5 之后阻塞,我不明白它应该是非阻塞的,对吧?
编辑 2:
通过将频道更改为 make(chan in/out, 100)
,它现在将在 105 之后阻塞。
Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value.
https://golang.org/doc/effective_go.html#channels
因此您可以将您的消费者代码重写为:
go func(){
for res := range chout {
update5.Exec(res.data, res.id)
}
}()
您还需要 close(chin)
和 close(chout)
才能正确使用 range
语句。