检查是否有人从 go 频道阅读
Check if someone has read from go channel
我们如何在 go 频道上设置监听器之类的东西,当有人从频道中阅读某些内容时通知我们?
想象一下,我们有一个 sequence number
用于通道条目,我们想在有人从我们的包中某处的通道读取值时减少它。
您可以在手动模式下完成。对消息实施某种 ACK
标记。
像这样:
type Msg struct {
Data int
ack bool
}
func (m *Msg) Ack() {
m.ack = true
}
func (m *Msg) Acked() bool {
return m.ack
}
func main() {
ch := make(chan *Msg)
msg := &Msg{Data: 1}
go func() {
for {
if msg.Acked() {
// do smth
}
time.Sleep(10 * time.Second)
}
}()
ch <- msg
for msg := range ch {
msg.Ack()
}
}
代码未经测试。
您还可以在 Ack()
方法中添加一些额外的信息,比如关于 package 和 func 的元信息,从哪里调用 Ack()
,这个答案可能是相关的:
无缓冲通道同步传递数据,因此您已经知道何时读取数据。当缓冲区已满时,缓冲通道的工作方式类似,但在其他情况下它们不会阻塞,因此这种方法不会告诉您完全相同的事情。根据您的实际需求,还可以考虑使用 sync.WaitGroup.
等工具
ch = make(chan Data)
⋮
for {
⋮
// make data available
ch <- data
// now you know it was read
sequenceNumber--
⋮
}
您可以创建通道中继机制,以实时捕获读取事件。
例如:
func relayer(in <-chan MyStruct) <-chan MyStruct {
out := make(chan MyStruct) // non-buffered chan (see below)
go func() {
defer close(out)
readCountLimit := 10
for item := range in {
out <- item
// ^^^^ so this will block until some worker has read from 'out'
readCountLimit--
}
}()
return out
}
用法:
type MyStruct struct {
// put your data fields here
}
ch := make(chan MyStruct) // <- original channel - used by producer to write to
rch := relayer(ch) // <- relay channel - used to read from
// consumers
go worker("worker 1", rch)
go worker("worker 2", rch)
// producer
for { ch <- MyStruct{} }
我们如何在 go 频道上设置监听器之类的东西,当有人从频道中阅读某些内容时通知我们?
想象一下,我们有一个 sequence number
用于通道条目,我们想在有人从我们的包中某处的通道读取值时减少它。
您可以在手动模式下完成。对消息实施某种 ACK
标记。
像这样:
type Msg struct {
Data int
ack bool
}
func (m *Msg) Ack() {
m.ack = true
}
func (m *Msg) Acked() bool {
return m.ack
}
func main() {
ch := make(chan *Msg)
msg := &Msg{Data: 1}
go func() {
for {
if msg.Acked() {
// do smth
}
time.Sleep(10 * time.Second)
}
}()
ch <- msg
for msg := range ch {
msg.Ack()
}
}
代码未经测试。
您还可以在 Ack()
方法中添加一些额外的信息,比如关于 package 和 func 的元信息,从哪里调用 Ack()
,这个答案可能是相关的:
无缓冲通道同步传递数据,因此您已经知道何时读取数据。当缓冲区已满时,缓冲通道的工作方式类似,但在其他情况下它们不会阻塞,因此这种方法不会告诉您完全相同的事情。根据您的实际需求,还可以考虑使用 sync.WaitGroup.
等工具ch = make(chan Data)
⋮
for {
⋮
// make data available
ch <- data
// now you know it was read
sequenceNumber--
⋮
}
您可以创建通道中继机制,以实时捕获读取事件。
例如:
func relayer(in <-chan MyStruct) <-chan MyStruct {
out := make(chan MyStruct) // non-buffered chan (see below)
go func() {
defer close(out)
readCountLimit := 10
for item := range in {
out <- item
// ^^^^ so this will block until some worker has read from 'out'
readCountLimit--
}
}()
return out
}
用法:
type MyStruct struct {
// put your data fields here
}
ch := make(chan MyStruct) // <- original channel - used by producer to write to
rch := relayer(ch) // <- relay channel - used to read from
// consumers
go worker("worker 1", rch)
go worker("worker 2", rch)
// producer
for { ch <- MyStruct{} }