为什么基于通道的 Lock 块?

why channel-based Lock blocks?

您好,我正在使用通道编写 Lock,旨在 Lock/Unlock 给定 'app' 的操作。

一般的想法是,一个 goroutine 一直监听两个通道:lockChunlockCh。任何 Lock() 操作发送一个自制通道到 lockCh,并等待从那个自制通道读取,从这个通道读取完成意味着 Lock() 成功。

类似的过程适用于 Unlock()

对于侦听器 gorouting,它会在接受 Lock() 时检查 'app' 是否已被锁定,如果是,则将那个自制通道放在 waiters-list 的尾部。如果有人 Unlock(),它会唤醒(通过向通道发送消息)下一个服务员,或者如果没有其他人在等待锁,则删除服务员列表。

代码在下面,我不知道哪里错了,但是测试用例就是不通过(它在几个Lock()Unlock()之后阻塞!)

谢谢你给我一些建议。

type receiver struct {
            app  string
            ch   chan struct{}
            next *receiver
}

type receiveList struct {
            head *receiver
            tail *receiver
}

type AppLock struct {
            lockCh   chan receiver
            unlockCh chan receiver

            // Consider lock x:
            // if map[x] doesn't exist, x is unlocked
            // if map[x] exist but head is nil, x is locked but no waiter
            // if map[x] exist and head isn't nil, x is locked and there're waiters
            m map[string]receiveList
}

func NewAppLock() *AppLock {
            l := new(AppLock)
            l.lockCh = make(chan receiver)
            l.unlockCh = make(chan receiver)
            l.m = make(map[string]receiveList)

            go l.lockRoutine()
            return l
}

func (l *AppLock) Lock(app string) {
            ch := make(chan struct{})
            l.lockCh <- receiver{
                app: app,
                ch:  ch,
            }
            <-ch
}

func (l *AppLock) Unlock(app string) {
            ch := make(chan struct{})
            l.unlockCh <- receiver{
                app: app,
                ch:  ch,
            }
            <-ch
}

func (l *AppLock) lockRoutine() {
            for {
                select {
                case r := <-l.lockCh:
                    rlist, ok := l.m[r.app]
                    if ok { // already locked
                        if rlist.head == nil { // no waiter
                            rlist.head = &r
                            rlist.tail = &r
                        } else { // there're waiters, wait in queue
                            rlist.tail.next = &r
                            rlist.tail = &r
                        }
                    } else { // unlocked
                        rlist = receiveList{}
                        l.m[r.app] = rlist
                        r.ch <- struct{}{}
                    }
                case r := <-l.unlockCh:
                    rlist, ok := l.m[r.app]
                    if ok {
                        if rlist.head == nil { // no waiter
                            delete(l.m, r.app)
                            r.ch <- struct{}{}
                        } else { // there're waiters
                            candidate := rlist.head
                            if rlist.head == rlist.tail {
                                rlist.head = nil
                                rlist.tail = nil
                            } else {
                                rlist.head = rlist.head.next
                            }
                            candidate.ch <- struct{}{}
                            r.ch <- struct{}{}
                        }
                    } else {
                        panic("AppLock: inconsistent lock state")
                    }
                }
            }
}

测试:

func main() {
        app := "APP"
        appLock := NewAppLock()
        c := make(chan bool)

        for i := 0; i < 10; i++ {
            go func(l *AppLock, loops int, cdone chan bool) {
                for i := 0; i < loops; i++ {
                    l.Lock(app)
                    l.Unlock(app)
                }
                cdone <- true
            }(appLock, 1, c)
        }

        for i := 0; i < 10; i++ {
            <-c
        }

    fmt.Println("DONE")
}

我认为问题出在您的 main for 循环上。您不会在导致死锁的任何地方关闭通道 c。尝试将循环更改为:

for i := 0; i < 10; i++ {
    var wg sync.WaitGroup // use WaitGroup
    wg.Add(1)
    go func(l *AppLock, loops int, cdone chan bool) {
        defer wg.Done()
        for i := 0; i < loops; i++ {
            l.Lock(app)
            l.Unlock(app)
        }
        cdone <- true
    }(appLock, 1, c)
    go func() {
        wg.Wait()
        close(c)
    }()
}

工作代码:https://play.golang.org/p/cbXj0CPTGO

虽然这应该可以解决死锁问题,但这并不一定意味着程序的行为是正确的。您需要更好的测试才能解决这个问题。

是否需要实现包含链表的状态机?可能有一个更简单的解决方案,使用 Go 的正常通道争用来提供队列。

因此,客户端排队等待访问无缓冲通道。锁定服务只是等待客户端。当一个到达时,锁定服务进入锁定状态。使用无缓冲通道发出解锁信号,状态 returns 解锁。

锁定时,锁定服务会忽略后续获得锁定的请求。这些客户端必须等待(Go 运行时为此提供了一个队列)。每次解锁后,服务就可以接下一个客户端。

type AppLock struct {
    lockCh   chan struct{}
    unlockCh chan struct{}
}

func NewAppLock() *AppLock {
    l := new(AppLock)
    l.lockCh = make(chan struct{})
    l.unlockCh = make(chan struct{})
    go l.lockRoutine()
    return l
}

func (l *AppLock) lockRoutine() {
    for {
        // take the next client off the channel
        <- l.lockCh
        // we are now in a locked state; wait until the client has released
        // which it signals via the channel c
        <- l.unlockCh
    }
}

func (l *AppLock) Lock() {
    // join the queue for a lock
    l.lockCh <- struct{}{}
}

func (l *AppLock) Unlock() {
    l.unlockCh <- struct{}{}
}

并且调用循环包含

        for i := 0; i < loops; i++ {
            l.Lock()
            l.Unlock()
        }

重要的是通道是无缓冲的,因为客户端必须等待完成会合才能知道他们已经获取(l.Lock())或释放(l.Unlock())锁。

https://play.golang.org/p/2j4lkeOR_s

PS

我不禁想知道你是否只需要 RWMutex (https://golang.org/pkg/sync/#RWMutex)

我刚刚发现我的代码中有错误。

type AppLock struct {
    lockCh   chan receiver
    unlockCh chan receiver

    // Consider lock x:
    // if map[x] doesn't exist, x is unlocked
    // if map[x] exist but head is nil, x is locked but no waiter
    // if map[x] exist and head isn't nil, x is locked and there're waiters
    m map[string]receiveList
}

本来我以为receiveList里面的headtail都是指针,所以我们总是可以对同一个waiters-list进行操作,即使receiveList不是't 指针类型。 (很明显是错的)

确实,如果我们只从headtail读取,而不使用receiveList的指针类型,那是可以的。但是,我确实在 lockRoutine 中写入它们,它正在写入它们的副本。就是这个问题。

有一个更简单的解决方案。单时隙缓冲通道已经起到了简单储物柜的作用。不需要管理器例程或链表队列:

// AppLock maintains a single exclusive lock for each app name string provided
type AppLock struct {
    // Consider lock x:
    // if map[x] doesn't exist, x is implicitly unlocked
    // if map[x] exist and can be read from, x is unlocked
    // if map[x] exist and blocks on read, x is locked
    m map[string]chan struct{}

    // RWMutex to protect the map writes, but allow free read access
    mut sync.RWMutex
}

func NewAppLock() *AppLock {
    l := new(AppLock)
    l.m = make(map[string]chan struct{})
    return l
}

func (l *AppLock) Lock(app string) {
    l.mut.RLock()
    ch, ok := l.m[app]
    l.mut.RUnlock()

    if ok {
        <-ch // blocks until lock acquired
    } else {
        l.mut.Lock()
        l.m[app] = make(chan struct{}, 1)
        // we don't want to put anything on it yet, because we still want the lock
        l.mut.Unlock()
    }
}

func (l *AppLock) Unlock(app string) {
    l.mut.RLock()
    ch, ok := l.m[app]
    l.mut.RUnlock()

    if ok {
        select {
        case ch <- struct{}{}:
            // do nothing, expected behavior
        default:
            // something already on channel, which means it wasn't locked
            panic("Unlock called on app not previously locked")
        }
    } else {
        panic("Unlock called on app not previously locked")
    }
}

https://play.golang.org/p/o02BHfRto3

这为每个应用维护了一个通道,缓冲 1。要锁定,您尝试从通道中读取。如果你收到一些东西(这只是一个代表锁的通用令牌),你就得到了锁,然后将令牌放回通道以解锁它。如果接收块,其他东西有锁,并且当其他例程解锁时接收将解除阻塞(将令牌放回通道)。

顺便说一下,这仍然是一个先进先出的系统,因为 Go 通道是有序的,并且将填充最早开始接收的接收(在那些仍在尝试接收的通道中)。