sync.Cond 测试广播 - 为什么要循环检查?
sync.Cond Test Broadcast - why check in a loop?
我正在尝试使用 sync.Cond - 等待和广播。我无法理解其中的某些部分:
Wait calls 的评论说:
41 // Because c.L is not locked when Wait first resumes, the caller
42 // typically cannot assume that the condition is true when
43 // Wait returns. Instead, the caller should Wait in a loop:
44 //
45 // c.L.Lock()
46 // for !condition() {
47 // c.Wait()
48 // }
49 // ... make use of condition ...
50 // c.L.Unlock()
需要这样做的原因是什么?
所以这意味着以下程序可能不正确(虽然它有效):
package main
import (
"bufio"
"fmt"
"os"
"sync"
)
type processor struct {
n int
c *sync.Cond
started chan int
}
func newProcessor(n int) *processor {
p := new(processor)
p.n = n
p.c = sync.NewCond(&sync.Mutex{})
p.started = make(chan int, n)
return p
}
func (p *processor) start() {
for i := 0; i < p.n; i++ {
go p.process(i)
}
for i := 0; i < p.n; i++ {
<-p.started
}
p.c.L.Lock()
p.c.Broadcast()
p.c.L.Unlock()
}
func (p *processor) process(f int) {
fmt.Printf("fork : %d\n", f)
p.c.L.Lock()
p.started <- f
p.c.Wait()
p.c.L.Unlock()
fmt.Printf("process: %d - out of wait\n", f)
}
func main() {
p := newProcessor(5)
p.start()
reader := bufio.NewReader(os.Stdin)
_,_ =reader.ReadString('\n')
}
条件变量不会保持信号状态,它们只会唤醒阻塞在 .Wait() 中的其他 go 例程。所以这会出现竞争条件,除非你有一个谓词来检查你是否需要等待,或者你想要等待的事情是否已经发生。
在您的特定情况下,您已经通过使用您的 p.started
通道在调用 .Wait()
的 go 例程和调用 .BroadCast()
的例程之间添加了同步,其方式尽可能我知道不应该出现我在 post 中进一步描述的竞争条件。虽然我不会打赌,但就我个人而言,我只是按照文档描述的惯用方式来做。
考虑您的 start()
函数正在这些行中执行广播:
p.c.L.Lock()
p.c.Broadcast()
在那个特定的时间点,考虑到您的其他 go 例程之一已经在您的 process()
函数中达到了这一点
fmt.Printf("fork : %d\n", f)
go 例程要做的下一件事是锁定互斥锁(至少在 start()
中的 go 例程释放该互斥锁之前它不会拥有它)并等待条件变量。
p.c.L.Lock()
p.started <- f
p.c.Wait()
但是 Wait 永远不会 return,因为此时没有人会 signal/broadcast 它 - 信号已经发生。
所以你需要另一个你可以测试自己的条件,这样当你已经知道条件已经发生时就不需要调用 Wait(),例如
type processor struct {
n int
c *sync.Cond
started chan int
done bool //added
}
...
func (p *processor) start() {
for i := 0; i < p.n; i++ {
go p.process(i)
}
for i := 0; i < p.n; i++ {
<-p.started
}
p.c.L.Lock()
p.done = true //added
p.c.Broadcast()
p.c.L.Unlock()
}
func (p *processor) process(f int) {
fmt.Printf("fork : %d\n", f)
p.c.L.Lock()
p.started <- f
for !p.done { //added
p.c.Wait()
}
p.c.L.Unlock()
fmt.Printf("process: %d - out of wait\n", f)
}
我正在尝试使用 sync.Cond - 等待和广播。我无法理解其中的某些部分:
Wait calls 的评论说:
41 // Because c.L is not locked when Wait first resumes, the caller
42 // typically cannot assume that the condition is true when
43 // Wait returns. Instead, the caller should Wait in a loop:
44 //
45 // c.L.Lock()
46 // for !condition() {
47 // c.Wait()
48 // }
49 // ... make use of condition ...
50 // c.L.Unlock()
需要这样做的原因是什么?
所以这意味着以下程序可能不正确(虽然它有效):
package main
import (
"bufio"
"fmt"
"os"
"sync"
)
type processor struct {
n int
c *sync.Cond
started chan int
}
func newProcessor(n int) *processor {
p := new(processor)
p.n = n
p.c = sync.NewCond(&sync.Mutex{})
p.started = make(chan int, n)
return p
}
func (p *processor) start() {
for i := 0; i < p.n; i++ {
go p.process(i)
}
for i := 0; i < p.n; i++ {
<-p.started
}
p.c.L.Lock()
p.c.Broadcast()
p.c.L.Unlock()
}
func (p *processor) process(f int) {
fmt.Printf("fork : %d\n", f)
p.c.L.Lock()
p.started <- f
p.c.Wait()
p.c.L.Unlock()
fmt.Printf("process: %d - out of wait\n", f)
}
func main() {
p := newProcessor(5)
p.start()
reader := bufio.NewReader(os.Stdin)
_,_ =reader.ReadString('\n')
}
条件变量不会保持信号状态,它们只会唤醒阻塞在 .Wait() 中的其他 go 例程。所以这会出现竞争条件,除非你有一个谓词来检查你是否需要等待,或者你想要等待的事情是否已经发生。
在您的特定情况下,您已经通过使用您的 p.started
通道在调用 .Wait()
的 go 例程和调用 .BroadCast()
的例程之间添加了同步,其方式尽可能我知道不应该出现我在 post 中进一步描述的竞争条件。虽然我不会打赌,但就我个人而言,我只是按照文档描述的惯用方式来做。
考虑您的 start()
函数正在这些行中执行广播:
p.c.L.Lock()
p.c.Broadcast()
在那个特定的时间点,考虑到您的其他 go 例程之一已经在您的 process()
函数中达到了这一点
fmt.Printf("fork : %d\n", f)
go 例程要做的下一件事是锁定互斥锁(至少在 start()
中的 go 例程释放该互斥锁之前它不会拥有它)并等待条件变量。
p.c.L.Lock()
p.started <- f
p.c.Wait()
但是 Wait 永远不会 return,因为此时没有人会 signal/broadcast 它 - 信号已经发生。
所以你需要另一个你可以测试自己的条件,这样当你已经知道条件已经发生时就不需要调用 Wait(),例如
type processor struct {
n int
c *sync.Cond
started chan int
done bool //added
}
...
func (p *processor) start() {
for i := 0; i < p.n; i++ {
go p.process(i)
}
for i := 0; i < p.n; i++ {
<-p.started
}
p.c.L.Lock()
p.done = true //added
p.c.Broadcast()
p.c.L.Unlock()
}
func (p *processor) process(f int) {
fmt.Printf("fork : %d\n", f)
p.c.L.Lock()
p.started <- f
for !p.done { //added
p.c.Wait()
}
p.c.L.Unlock()
fmt.Printf("process: %d - out of wait\n", f)
}