如何正确使用sync.Cond?
How to correctly use sync.Cond?
我不知道如何正确使用 sync.Cond
。据我所知,锁定 Locker 和调用条件的 Wait 方法之间存在竞争条件。这个例子在主 goroutine 的两行之间添加了一个人为的延迟来模拟竞争条件:
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
这会立即引起恐慌:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
/usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
/usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
/tmp/sandbox301865429/main.go:17 +0x1a0
我做错了什么?我如何避免这种明显的竞争条件?我应该使用更好的同步结构吗?
编辑: 我意识到我应该更好地解释我试图在这里解决的问题。我有一个 long-running goroutine 可以下载一个大文件,还有许多其他 goroutines 需要在可用时访问 HTTP headers。这个问题比听起来更难。
我不能使用通道,因为只有一个 goroutine 会收到该值。并且其他一些 goroutine 会在 headers 已经可用很久之后尝试检索它们。
下载器 goroutine 可以简单地将 HTTP headers 存储在一个变量中,并使用互斥锁来保护对它们的访问。但是,这并没有为其他 goroutines 提供一种方法 "wait" 让它们变得可用。
我原以为 sync.Mutex
和 sync.Cond
一起可以实现这个目标,但看来这是不可能的。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock() // main gouroutine is owner of lock
c := sync.NewCond(&m)
go func() {
m.Lock() // obtain a lock
defer m.Unlock()
fmt.Println("3. goroutine is owner of lock")
time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
c.Broadcast() // State has been changed, publish it to waiting goroutines
fmt.Println("4. goroutine will release lock soon (deffered Unlock")
}()
fmt.Println("1. main goroutine is owner of lock")
time.Sleep(1 * time.Second) // initialization
fmt.Println("2. main goroutine is still lockek")
c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
// Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
m.Unlock()
fmt.Println("Done")
}
看起来你 c.Wait 用于广播,这在你的时间间隔内永远不会发生。
有
time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()
您的代码片段似乎有效 http://play.golang.org/p/OE8aP4i6gY。或者我是否遗漏了您试图实现的目标?
我终于找到了一种方法,它根本不涉及 sync.Cond
- 只涉及互斥体。
type Task struct {
m sync.Mutex
headers http.Header
}
func NewTask() *Task {
t := &Task{}
t.m.Lock()
go func() {
defer t.m.Unlock()
// ...do stuff...
}()
return t
}
func (t *Task) WaitFor() http.Header {
t.m.Lock()
defer t.m.Unlock()
return t.headers
}
这是如何工作的?
互斥锁在任务开始时被锁定,确保任何调用 WaitFor()
的东西都会被阻塞。一旦 headers 可用并且互斥量被 goroutine 解锁,对 WaitFor()
的每次调用将一次执行一个。所有未来的调用(即使在 goroutine 结束之后)锁定互斥量都没有问题,因为它总是保持解锁状态。
OP自己回答的,但是没有直接回答原问题,我准备post如何正确使用sync.Cond
。
如果每个写入和读取都有一个 goroutine,那么您实际上并不需要 sync.Cond
- 一个 sync.Mutex
就足以在它们之间进行通信。 sync.Cond
在多个读者等待共享资源可用的情况下很有用。
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
话虽如此,如果情况允许,使用通道仍然是推荐的传递数据的方式。
注意:sync.WaitGroup
这里只是用来等待goroutines执行完毕。
您需要确保 c.Broadcast 在您调用 c.Wait 之后 被调用。您的程序的正确版本是:
package main
import (
"fmt"
"sync"
)
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m, waits, then locks m again
m.Unlock()
}
这是一个包含两个 go 例程的实际示例。他们一个接一个地开始,但第二个在继续之前等待第一个广播的条件:
package main
import (
"sync"
"fmt"
"time"
)
func main() {
lock := sync.Mutex{}
lock.Lock()
cond := sync.NewCond(&lock)
waitGroup := sync.WaitGroup{}
waitGroup.Add(2)
go func() {
defer waitGroup.Done()
fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")
time.Sleep(1 * time.Second)
fmt.Println("First go routine broadcasts condition")
cond.Broadcast()
}()
go func() {
defer waitGroup.Done()
fmt.Println("Second go routine has started and is waiting on condition")
cond.Wait()
fmt.Println("Second go routine unlocked by condition broadcast")
}()
fmt.Println("Main go routine starts waiting")
waitGroup.Wait()
fmt.Println("Main go routine ends")
}
输出可能略有不同,因为第二个 go 例程可能在第一个例程之前开始,反之亦然:
Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends
https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70
是的,您可以使用一个通道将 Header 传递给多个 Go 例程。
headerChan := make(chan http.Header)
go func() { // This routine can be started many times
header := <-headerChan // Wait for header
// Do things with the header
}()
// Feed the header to all waiting go routines
for more := true; more; {
select {
case headerChan <- r.Header:
default: more = false
}
}
在优秀的书中 "Concurrency in Go" 他们提供了以下简单的解决方案,同时利用关闭的通道将释放所有等待的客户端这一事实。
package main
import (
"fmt"
"time"
)
func main() {
httpHeaders := []string{}
headerChan := make(chan interface{})
var consumerFunc= func(id int, stream <-chan interface{}, funcHeaders *[]string)
{
<-stream
fmt.Println("Consumer ",id," got headers:", funcHeaders )
}
for i:=0;i<3;i++ {
go consumerFunc(i, headerChan, &httpHeaders)
}
fmt.Println("Getting headers...")
time.Sleep(2*time.Second)
httpHeaders=append(httpHeaders, "test1");
fmt.Println("Publishing headers...")
close(headerChan )
time.Sleep(5*time.Second)
}
我不知道如何正确使用 sync.Cond
。据我所知,锁定 Locker 和调用条件的 Wait 方法之间存在竞争条件。这个例子在主 goroutine 的两行之间添加了一个人为的延迟来模拟竞争条件:
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
这会立即引起恐慌:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Syncsemacquire(0x10330208, 0x1) /usr/local/go/src/runtime/sema.go:241 +0x2e0 sync.(*Cond).Wait(0x10330200, 0x0) /usr/local/go/src/sync/cond.go:63 +0xe0 main.main() /tmp/sandbox301865429/main.go:17 +0x1a0
我做错了什么?我如何避免这种明显的竞争条件?我应该使用更好的同步结构吗?
编辑: 我意识到我应该更好地解释我试图在这里解决的问题。我有一个 long-running goroutine 可以下载一个大文件,还有许多其他 goroutines 需要在可用时访问 HTTP headers。这个问题比听起来更难。
我不能使用通道,因为只有一个 goroutine 会收到该值。并且其他一些 goroutine 会在 headers 已经可用很久之后尝试检索它们。
下载器 goroutine 可以简单地将 HTTP headers 存储在一个变量中,并使用互斥锁来保护对它们的访问。但是,这并没有为其他 goroutines 提供一种方法 "wait" 让它们变得可用。
我原以为 sync.Mutex
和 sync.Cond
一起可以实现这个目标,但看来这是不可能的。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock() // main gouroutine is owner of lock
c := sync.NewCond(&m)
go func() {
m.Lock() // obtain a lock
defer m.Unlock()
fmt.Println("3. goroutine is owner of lock")
time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
c.Broadcast() // State has been changed, publish it to waiting goroutines
fmt.Println("4. goroutine will release lock soon (deffered Unlock")
}()
fmt.Println("1. main goroutine is owner of lock")
time.Sleep(1 * time.Second) // initialization
fmt.Println("2. main goroutine is still lockek")
c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
// Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
m.Unlock()
fmt.Println("Done")
}
看起来你 c.Wait 用于广播,这在你的时间间隔内永远不会发生。 有
time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()
您的代码片段似乎有效 http://play.golang.org/p/OE8aP4i6gY。或者我是否遗漏了您试图实现的目标?
我终于找到了一种方法,它根本不涉及 sync.Cond
- 只涉及互斥体。
type Task struct {
m sync.Mutex
headers http.Header
}
func NewTask() *Task {
t := &Task{}
t.m.Lock()
go func() {
defer t.m.Unlock()
// ...do stuff...
}()
return t
}
func (t *Task) WaitFor() http.Header {
t.m.Lock()
defer t.m.Unlock()
return t.headers
}
这是如何工作的?
互斥锁在任务开始时被锁定,确保任何调用 WaitFor()
的东西都会被阻塞。一旦 headers 可用并且互斥量被 goroutine 解锁,对 WaitFor()
的每次调用将一次执行一个。所有未来的调用(即使在 goroutine 结束之后)锁定互斥量都没有问题,因为它总是保持解锁状态。
OP自己回答的,但是没有直接回答原问题,我准备post如何正确使用sync.Cond
。
如果每个写入和读取都有一个 goroutine,那么您实际上并不需要 sync.Cond
- 一个 sync.Mutex
就足以在它们之间进行通信。 sync.Cond
在多个读者等待共享资源可用的情况下很有用。
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
话虽如此,如果情况允许,使用通道仍然是推荐的传递数据的方式。
注意:sync.WaitGroup
这里只是用来等待goroutines执行完毕。
您需要确保 c.Broadcast 在您调用 c.Wait 之后 被调用。您的程序的正确版本是:
package main
import (
"fmt"
"sync"
)
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m, waits, then locks m again
m.Unlock()
}
这是一个包含两个 go 例程的实际示例。他们一个接一个地开始,但第二个在继续之前等待第一个广播的条件:
package main
import (
"sync"
"fmt"
"time"
)
func main() {
lock := sync.Mutex{}
lock.Lock()
cond := sync.NewCond(&lock)
waitGroup := sync.WaitGroup{}
waitGroup.Add(2)
go func() {
defer waitGroup.Done()
fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")
time.Sleep(1 * time.Second)
fmt.Println("First go routine broadcasts condition")
cond.Broadcast()
}()
go func() {
defer waitGroup.Done()
fmt.Println("Second go routine has started and is waiting on condition")
cond.Wait()
fmt.Println("Second go routine unlocked by condition broadcast")
}()
fmt.Println("Main go routine starts waiting")
waitGroup.Wait()
fmt.Println("Main go routine ends")
}
输出可能略有不同,因为第二个 go 例程可能在第一个例程之前开始,反之亦然:
Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends
https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70
是的,您可以使用一个通道将 Header 传递给多个 Go 例程。
headerChan := make(chan http.Header)
go func() { // This routine can be started many times
header := <-headerChan // Wait for header
// Do things with the header
}()
// Feed the header to all waiting go routines
for more := true; more; {
select {
case headerChan <- r.Header:
default: more = false
}
}
在优秀的书中 "Concurrency in Go" 他们提供了以下简单的解决方案,同时利用关闭的通道将释放所有等待的客户端这一事实。
package main
import (
"fmt"
"time"
)
func main() {
httpHeaders := []string{}
headerChan := make(chan interface{})
var consumerFunc= func(id int, stream <-chan interface{}, funcHeaders *[]string)
{
<-stream
fmt.Println("Consumer ",id," got headers:", funcHeaders )
}
for i:=0;i<3;i++ {
go consumerFunc(i, headerChan, &httpHeaders)
}
fmt.Println("Getting headers...")
time.Sleep(2*time.Second)
httpHeaders=append(httpHeaders, "test1");
fmt.Println("Publishing headers...")
close(headerChan )
time.Sleep(5*time.Second)
}