从接收方关闭通道:从多个 goroutine 访问 sync.Mutex 时出现死锁

Closing a channel from the receiver side: deadlock when accessing sync.Mutex from multiple goroutines

我正在尝试从接收端实现优雅的通道关闭。

是的,我知道这违反了频道关闭规则:

...don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

但是我想实现这样的逻辑。不幸的是,我在很多情况下都没有陷入死锁问题:应用程序只是无限期地挂起,试图再次锁定相同的锁定 Mutex

所以,我有 2 个协程:

  • 一个将写入通道和
  • 另一个将接收数据的 + 将从接收端关闭通道。

我的频道用 sync.Mutexclosed 布尔标志包装在结构中:

type Chan struct {
    sync.Mutex // can be replaced with deadlock.Mutex from "github.com/sasha-s/go-deadlock"
    data           chan int
    closed         bool
}

此结构上的所有 Send()Close()IsClosed() 操作都由 Mutex 保护,并为防止重复锁定具有 非线程安全方法版本 (send(), close(), isClosed()).

完整源代码:

package main

import (
    "log"
    "net/http"
    "sync"
)

func main() {
    log.Println("Start")

    ch := New(0) // unbuffered channel to expose problem faster

    wg := sync.WaitGroup{}
    wg.Add(2)

    // send data:
    go func(ch *Chan) {
        for i := 0; i < 100; i++ {
            ch.Send(i)
        }
        wg.Done()
    }(ch)

    // receive data and close from receiver side:
    go func(ch *Chan) {
        for data := range ch.data {
            log.Printf("Received %d data", data)
            // Bad practice: I want to close the channel from receiver's side:
            if data > 50 {
                ch.Close()
                break
            }
        }
        wg.Done()
    }(ch)

    wg.Wait()
    log.Println("End")
}

type Chan struct {
    deadlock.Mutex //sync.Mutex
    data           chan int
    closed         bool
}

func New(size int) *Chan {
    defer func() {
        log.Printf("Channel was created")
    }()
    return &Chan{
        data: make(chan int, size),
    }
}

func (c *Chan) Send(data int) {
    c.Lock()
    c.send(data)
    c.Unlock()
}

func (c *Chan) Close() {
    c.Lock()
    c.close()
    c.Unlock()
}

func (c *Chan) IsClosed() bool {
    c.Lock()
    defer c.Unlock()
    return c.isClosed()
}

// send is internal non-threadsafe api.
func (c *Chan) send(data int) {
    if !c.closed {
        c.data <- data
        log.Printf("Data %d was sent", data)
    }
}

// close is internal non-threadsafe api.
func (c *Chan) close() {
    if !c.closed {
        close(c.data)
        c.closed = true
        log.Println("Channel was closed")
    } else {
        log.Println("Channel was already closed")
    }
}

// isClosed is internal non-threadsafe api.
func (c *Chan) isClosed() bool {
    return c.closed
}

您可以 运行 这个程序在 sandbox.

在本地机器上,在少量 运行 秒后,30 秒后输出将是(使用 deadlock.Mutex 而不是 sync.Mutex):

2018/04/01 11:26:22 Data 50 was sent
2018/04/01 11:26:22 Received 50 data
2018/04/01 11:26:22 Data 51 was sent
2018/04/01 11:26:22 Received 51 data
POTENTIAL DEADLOCK:
Previous place where the lock was grabbed
goroutine 35 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:71 main.(*Chan).Send { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:30 main.main.func1 { ch.Send(i) }

Have been trying to lock it again for more than 30s
goroutine 36 lock 0xc42015a040
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:77 main.(*Chan).Close { c.Lock() } <<<<<
close-from-receiver-side/closeFromReceiverSideIsBadPractice.go:44 main.main.func2 { ch.Close() }

为什么会发生这种死锁以及如何修复此实现以避免死锁?


关闭发送方的通道不是解决办法。所以,这不是我的问题的解决方法:Example of closing channel from sender side.

Send 获取锁,然后尝试在通道中发送数据。这可能发生在第 50 次接收操作之后。将不再有接收,所以 c.data <- data 永远阻塞,因此互斥锁永远持有。

要取消,请在 Send 中使用另一个通道(而不是布尔值)和 select 语句。您还可以利用 the context package.

你可以尽情尝试:你必须从发送方关闭通道。

你也许可以在没有完全锁定的情况下让它工作,但你会泄漏 goroutines。发件人将永远阻止并且无法关闭。如果接收方想要触发关闭,它必须告诉发送方关闭通道。您如何告诉发件人关闭:

  • 您建议的布尔值(需要另一个互斥体)
  • 一个stop-通道,当关闭时向发送者发出关闭数据通道的信号(不能关闭多次)
  • a ctx.Context:调用cancel()函数将向发送方发出停止信号。 (可多次取消无忧)

(仅详细说明彼得斯的正确答案)