为什么基于通道的 Lock 块?
why channel-based Lock blocks?
您好,我正在使用通道编写 Lock
,旨在 Lock/Unlock 给定 'app' 的操作。
一般的想法是,一个 goroutine 一直监听两个通道:lockCh
和 unlockCh
。任何 Lock()
操作发送一个自制通道到 lockCh
,并等待从那个自制通道读取,从这个通道读取完成意味着 Lock()
成功。
类似的过程适用于 Unlock()
。
对于侦听器 gorouting,它会在接受 Lock()
时检查 'app' 是否已被锁定,如果是,则将那个自制通道放在 waiters-list 的尾部。如果有人 Unlock()
,它会唤醒(通过向通道发送消息)下一个服务员,或者如果没有其他人在等待锁,则删除服务员列表。
代码在下面,我不知道哪里错了,但是测试用例就是不通过(它在几个Lock()
和Unlock()
之后阻塞!)
谢谢你给我一些建议。
type receiver struct {
app string
ch chan struct{}
next *receiver
}
type receiveList struct {
head *receiver
tail *receiver
}
type AppLock struct {
lockCh chan receiver
unlockCh chan receiver
// Consider lock x:
// if map[x] doesn't exist, x is unlocked
// if map[x] exist but head is nil, x is locked but no waiter
// if map[x] exist and head isn't nil, x is locked and there're waiters
m map[string]receiveList
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.lockCh = make(chan receiver)
l.unlockCh = make(chan receiver)
l.m = make(map[string]receiveList)
go l.lockRoutine()
return l
}
func (l *AppLock) Lock(app string) {
ch := make(chan struct{})
l.lockCh <- receiver{
app: app,
ch: ch,
}
<-ch
}
func (l *AppLock) Unlock(app string) {
ch := make(chan struct{})
l.unlockCh <- receiver{
app: app,
ch: ch,
}
<-ch
}
func (l *AppLock) lockRoutine() {
for {
select {
case r := <-l.lockCh:
rlist, ok := l.m[r.app]
if ok { // already locked
if rlist.head == nil { // no waiter
rlist.head = &r
rlist.tail = &r
} else { // there're waiters, wait in queue
rlist.tail.next = &r
rlist.tail = &r
}
} else { // unlocked
rlist = receiveList{}
l.m[r.app] = rlist
r.ch <- struct{}{}
}
case r := <-l.unlockCh:
rlist, ok := l.m[r.app]
if ok {
if rlist.head == nil { // no waiter
delete(l.m, r.app)
r.ch <- struct{}{}
} else { // there're waiters
candidate := rlist.head
if rlist.head == rlist.tail {
rlist.head = nil
rlist.tail = nil
} else {
rlist.head = rlist.head.next
}
candidate.ch <- struct{}{}
r.ch <- struct{}{}
}
} else {
panic("AppLock: inconsistent lock state")
}
}
}
}
测试:
func main() {
app := "APP"
appLock := NewAppLock()
c := make(chan bool)
for i := 0; i < 10; i++ {
go func(l *AppLock, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
l.Lock(app)
l.Unlock(app)
}
cdone <- true
}(appLock, 1, c)
}
for i := 0; i < 10; i++ {
<-c
}
fmt.Println("DONE")
}
我认为问题出在您的 main
for
循环上。您不会在导致死锁的任何地方关闭通道 c
。尝试将循环更改为:
for i := 0; i < 10; i++ {
var wg sync.WaitGroup // use WaitGroup
wg.Add(1)
go func(l *AppLock, loops int, cdone chan bool) {
defer wg.Done()
for i := 0; i < loops; i++ {
l.Lock(app)
l.Unlock(app)
}
cdone <- true
}(appLock, 1, c)
go func() {
wg.Wait()
close(c)
}()
}
工作代码:https://play.golang.org/p/cbXj0CPTGO
虽然这应该可以解决死锁问题,但这并不一定意味着程序的行为是正确的。您需要更好的测试才能解决这个问题。
是否需要实现包含链表的状态机?可能有一个更简单的解决方案,使用 Go 的正常通道争用来提供队列。
因此,客户端排队等待访问无缓冲通道。锁定服务只是等待客户端。当一个到达时,锁定服务进入锁定状态。使用无缓冲通道发出解锁信号,状态 returns 解锁。
锁定时,锁定服务会忽略后续获得锁定的请求。这些客户端必须等待(Go 运行时为此提供了一个队列)。每次解锁后,服务就可以接下一个客户端。
type AppLock struct {
lockCh chan struct{}
unlockCh chan struct{}
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.lockCh = make(chan struct{})
l.unlockCh = make(chan struct{})
go l.lockRoutine()
return l
}
func (l *AppLock) lockRoutine() {
for {
// take the next client off the channel
<- l.lockCh
// we are now in a locked state; wait until the client has released
// which it signals via the channel c
<- l.unlockCh
}
}
func (l *AppLock) Lock() {
// join the queue for a lock
l.lockCh <- struct{}{}
}
func (l *AppLock) Unlock() {
l.unlockCh <- struct{}{}
}
并且调用循环包含
for i := 0; i < loops; i++ {
l.Lock()
l.Unlock()
}
重要的是通道是无缓冲的,因为客户端必须等待完成会合才能知道他们已经获取(l.Lock()
)或释放(l.Unlock()
)锁。
https://play.golang.org/p/2j4lkeOR_s
PS
我不禁想知道你是否只需要 RWMutex (https://golang.org/pkg/sync/#RWMutex)
我刚刚发现我的代码中有错误。
type AppLock struct {
lockCh chan receiver
unlockCh chan receiver
// Consider lock x:
// if map[x] doesn't exist, x is unlocked
// if map[x] exist but head is nil, x is locked but no waiter
// if map[x] exist and head isn't nil, x is locked and there're waiters
m map[string]receiveList
}
本来我以为receiveList
里面的head
和tail
都是指针,所以我们总是可以对同一个waiters-list进行操作,即使receiveList
不是't 指针类型。 (很明显是错的)
确实,如果我们只从head
和tail
读取,而不使用receiveList
的指针类型,那是可以的。但是,我确实在 lockRoutine 中写入它们,它正在写入它们的副本。就是这个问题。
有一个更简单的解决方案。单时隙缓冲通道已经起到了简单储物柜的作用。不需要管理器例程或链表队列:
// AppLock maintains a single exclusive lock for each app name string provided
type AppLock struct {
// Consider lock x:
// if map[x] doesn't exist, x is implicitly unlocked
// if map[x] exist and can be read from, x is unlocked
// if map[x] exist and blocks on read, x is locked
m map[string]chan struct{}
// RWMutex to protect the map writes, but allow free read access
mut sync.RWMutex
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.m = make(map[string]chan struct{})
return l
}
func (l *AppLock) Lock(app string) {
l.mut.RLock()
ch, ok := l.m[app]
l.mut.RUnlock()
if ok {
<-ch // blocks until lock acquired
} else {
l.mut.Lock()
l.m[app] = make(chan struct{}, 1)
// we don't want to put anything on it yet, because we still want the lock
l.mut.Unlock()
}
}
func (l *AppLock) Unlock(app string) {
l.mut.RLock()
ch, ok := l.m[app]
l.mut.RUnlock()
if ok {
select {
case ch <- struct{}{}:
// do nothing, expected behavior
default:
// something already on channel, which means it wasn't locked
panic("Unlock called on app not previously locked")
}
} else {
panic("Unlock called on app not previously locked")
}
}
https://play.golang.org/p/o02BHfRto3
这为每个应用维护了一个通道,缓冲 1。要锁定,您尝试从通道中读取。如果你收到一些东西(这只是一个代表锁的通用令牌),你就得到了锁,然后将令牌放回通道以解锁它。如果接收块,其他东西有锁,并且当其他例程解锁时接收将解除阻塞(将令牌放回通道)。
顺便说一下,这仍然是一个先进先出的系统,因为 Go 通道是有序的,并且将填充最早开始接收的接收(在那些仍在尝试接收的通道中)。
您好,我正在使用通道编写 Lock
,旨在 Lock/Unlock 给定 'app' 的操作。
一般的想法是,一个 goroutine 一直监听两个通道:lockCh
和 unlockCh
。任何 Lock()
操作发送一个自制通道到 lockCh
,并等待从那个自制通道读取,从这个通道读取完成意味着 Lock()
成功。
类似的过程适用于 Unlock()
。
对于侦听器 gorouting,它会在接受 Lock()
时检查 'app' 是否已被锁定,如果是,则将那个自制通道放在 waiters-list 的尾部。如果有人 Unlock()
,它会唤醒(通过向通道发送消息)下一个服务员,或者如果没有其他人在等待锁,则删除服务员列表。
代码在下面,我不知道哪里错了,但是测试用例就是不通过(它在几个Lock()
和Unlock()
之后阻塞!)
谢谢你给我一些建议。
type receiver struct {
app string
ch chan struct{}
next *receiver
}
type receiveList struct {
head *receiver
tail *receiver
}
type AppLock struct {
lockCh chan receiver
unlockCh chan receiver
// Consider lock x:
// if map[x] doesn't exist, x is unlocked
// if map[x] exist but head is nil, x is locked but no waiter
// if map[x] exist and head isn't nil, x is locked and there're waiters
m map[string]receiveList
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.lockCh = make(chan receiver)
l.unlockCh = make(chan receiver)
l.m = make(map[string]receiveList)
go l.lockRoutine()
return l
}
func (l *AppLock) Lock(app string) {
ch := make(chan struct{})
l.lockCh <- receiver{
app: app,
ch: ch,
}
<-ch
}
func (l *AppLock) Unlock(app string) {
ch := make(chan struct{})
l.unlockCh <- receiver{
app: app,
ch: ch,
}
<-ch
}
func (l *AppLock) lockRoutine() {
for {
select {
case r := <-l.lockCh:
rlist, ok := l.m[r.app]
if ok { // already locked
if rlist.head == nil { // no waiter
rlist.head = &r
rlist.tail = &r
} else { // there're waiters, wait in queue
rlist.tail.next = &r
rlist.tail = &r
}
} else { // unlocked
rlist = receiveList{}
l.m[r.app] = rlist
r.ch <- struct{}{}
}
case r := <-l.unlockCh:
rlist, ok := l.m[r.app]
if ok {
if rlist.head == nil { // no waiter
delete(l.m, r.app)
r.ch <- struct{}{}
} else { // there're waiters
candidate := rlist.head
if rlist.head == rlist.tail {
rlist.head = nil
rlist.tail = nil
} else {
rlist.head = rlist.head.next
}
candidate.ch <- struct{}{}
r.ch <- struct{}{}
}
} else {
panic("AppLock: inconsistent lock state")
}
}
}
}
测试:
func main() {
app := "APP"
appLock := NewAppLock()
c := make(chan bool)
for i := 0; i < 10; i++ {
go func(l *AppLock, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
l.Lock(app)
l.Unlock(app)
}
cdone <- true
}(appLock, 1, c)
}
for i := 0; i < 10; i++ {
<-c
}
fmt.Println("DONE")
}
我认为问题出在您的 main
for
循环上。您不会在导致死锁的任何地方关闭通道 c
。尝试将循环更改为:
for i := 0; i < 10; i++ {
var wg sync.WaitGroup // use WaitGroup
wg.Add(1)
go func(l *AppLock, loops int, cdone chan bool) {
defer wg.Done()
for i := 0; i < loops; i++ {
l.Lock(app)
l.Unlock(app)
}
cdone <- true
}(appLock, 1, c)
go func() {
wg.Wait()
close(c)
}()
}
工作代码:https://play.golang.org/p/cbXj0CPTGO
虽然这应该可以解决死锁问题,但这并不一定意味着程序的行为是正确的。您需要更好的测试才能解决这个问题。
是否需要实现包含链表的状态机?可能有一个更简单的解决方案,使用 Go 的正常通道争用来提供队列。
因此,客户端排队等待访问无缓冲通道。锁定服务只是等待客户端。当一个到达时,锁定服务进入锁定状态。使用无缓冲通道发出解锁信号,状态 returns 解锁。
锁定时,锁定服务会忽略后续获得锁定的请求。这些客户端必须等待(Go 运行时为此提供了一个队列)。每次解锁后,服务就可以接下一个客户端。
type AppLock struct {
lockCh chan struct{}
unlockCh chan struct{}
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.lockCh = make(chan struct{})
l.unlockCh = make(chan struct{})
go l.lockRoutine()
return l
}
func (l *AppLock) lockRoutine() {
for {
// take the next client off the channel
<- l.lockCh
// we are now in a locked state; wait until the client has released
// which it signals via the channel c
<- l.unlockCh
}
}
func (l *AppLock) Lock() {
// join the queue for a lock
l.lockCh <- struct{}{}
}
func (l *AppLock) Unlock() {
l.unlockCh <- struct{}{}
}
并且调用循环包含
for i := 0; i < loops; i++ {
l.Lock()
l.Unlock()
}
重要的是通道是无缓冲的,因为客户端必须等待完成会合才能知道他们已经获取(l.Lock()
)或释放(l.Unlock()
)锁。
https://play.golang.org/p/2j4lkeOR_s
PS
我不禁想知道你是否只需要 RWMutex (https://golang.org/pkg/sync/#RWMutex)
我刚刚发现我的代码中有错误。
type AppLock struct {
lockCh chan receiver
unlockCh chan receiver
// Consider lock x:
// if map[x] doesn't exist, x is unlocked
// if map[x] exist but head is nil, x is locked but no waiter
// if map[x] exist and head isn't nil, x is locked and there're waiters
m map[string]receiveList
}
本来我以为receiveList
里面的head
和tail
都是指针,所以我们总是可以对同一个waiters-list进行操作,即使receiveList
不是't 指针类型。 (很明显是错的)
确实,如果我们只从head
和tail
读取,而不使用receiveList
的指针类型,那是可以的。但是,我确实在 lockRoutine 中写入它们,它正在写入它们的副本。就是这个问题。
有一个更简单的解决方案。单时隙缓冲通道已经起到了简单储物柜的作用。不需要管理器例程或链表队列:
// AppLock maintains a single exclusive lock for each app name string provided
type AppLock struct {
// Consider lock x:
// if map[x] doesn't exist, x is implicitly unlocked
// if map[x] exist and can be read from, x is unlocked
// if map[x] exist and blocks on read, x is locked
m map[string]chan struct{}
// RWMutex to protect the map writes, but allow free read access
mut sync.RWMutex
}
func NewAppLock() *AppLock {
l := new(AppLock)
l.m = make(map[string]chan struct{})
return l
}
func (l *AppLock) Lock(app string) {
l.mut.RLock()
ch, ok := l.m[app]
l.mut.RUnlock()
if ok {
<-ch // blocks until lock acquired
} else {
l.mut.Lock()
l.m[app] = make(chan struct{}, 1)
// we don't want to put anything on it yet, because we still want the lock
l.mut.Unlock()
}
}
func (l *AppLock) Unlock(app string) {
l.mut.RLock()
ch, ok := l.m[app]
l.mut.RUnlock()
if ok {
select {
case ch <- struct{}{}:
// do nothing, expected behavior
default:
// something already on channel, which means it wasn't locked
panic("Unlock called on app not previously locked")
}
} else {
panic("Unlock called on app not previously locked")
}
}
https://play.golang.org/p/o02BHfRto3
这为每个应用维护了一个通道,缓冲 1。要锁定,您尝试从通道中读取。如果你收到一些东西(这只是一个代表锁的通用令牌),你就得到了锁,然后将令牌放回通道以解锁它。如果接收块,其他东西有锁,并且当其他例程解锁时接收将解除阻塞(将令牌放回通道)。
顺便说一下,这仍然是一个先进先出的系统,因为 Go 通道是有序的,并且将填充最早开始接收的接收(在那些仍在尝试接收的通道中)。