检查是否有人从 go 频道阅读

Check if someone has read from go channel

我们如何在 go 频道上设置监听器之类的东西,当有人从频道中阅读某些内容时通知我们?

想象一下,我们有一个 sequence number 用于通道条目,我们想在有人从我们的包中某处的通道读取值时减少它。

您可以在手动模式下完成。对消息实施某种 ACK 标记。 像这样:

type Msg struct {
    Data int
    ack  bool
}

func (m *Msg) Ack() {
    m.ack = true
}

func (m *Msg) Acked() bool {
    return m.ack
}

func main() {
    ch := make(chan *Msg)
    msg := &Msg{Data: 1}
    go func() {
        for {
            if msg.Acked() {
                // do smth
            }
            time.Sleep(10 * time.Second)
        }
    }()
    ch <- msg

    for msg := range ch {
        msg.Ack()
    }
}

代码未经测试。 您还可以在 Ack() 方法中添加一些额外的信息,比如关于 package 和 func 的元信息,从哪里调用 Ack() ,这个答案可能是相关的:

无缓冲通道同步传递数据,因此您已经知道何时读取数据。当缓冲区已满时,缓冲通道的工作方式类似,但在其他情况下它们不会阻塞,因此这种方法不会告诉您完全相同的事情。根据您的实际需求,还可以考虑使用 sync.WaitGroup.

等工具
ch = make(chan Data)
  ⋮
for {
      ⋮
    // make data available
    ch <- data

    // now you know it was read
    sequenceNumber--
      ⋮
}

您可以创建通道中继机制,以实时捕获读取事件。

例如:

func relayer(in <-chan MyStruct) <-chan MyStruct {
        out := make(chan MyStruct) // non-buffered chan (see below)

        go func() {     
                defer close(out)
                readCountLimit := 10

                for item := range in {
                        out <- item
                        // ^^^^ so this will block until some worker has read from 'out'
                        readCountLimit--
                }
        }()     
        return out      
}

用法:

type MyStruct struct {
        // put your data fields here
}

ch := make(chan MyStruct) // <- original channel - used by producer to write to

rch := relayer(ch) // <- relay channel - used to read from

// consumers
go worker("worker 1", rch)
go worker("worker 2", rch)

// producer
for { ch <- MyStruct{} }