将数据从一个 goroutine 发送到多个其他 goroutine
Sending data from one goroutine to multiple other goroutines
在项目中程序通过websocket接收数据。这个数据需要经过n种算法处理。算法数量可以动态变化。
我的尝试是创建一些 pub/sub 模式,可以在其中即时启动和取消订阅。事实证明,这比预期的更具挑战性。
这是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):
package pubsub
import (
"context"
"sync"
"time"
)
type Pubsub struct {
sync.RWMutex
subs []*Subsciption
closed bool
}
func New() *Pubsub {
ps := &Pubsub{}
ps.subs = []*Subsciption{}
return ps
}
func (ps *Pubsub) Publish(msg interface{}) {
ps.RLock()
defer ps.RUnlock()
if ps.closed {
return
}
for _, sub := range ps.subs {
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
}
}
func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
ps.Lock()
defer ps.Unlock()
// prep channel
ctx, cancel := context.WithCancel(context.Background())
sub := &Subsciption{
Data: make(chan interface{}, 1),
cancel: cancel,
ps: ps,
}
// prep subsciption
ps.subs = append(ps.subs, sub)
return ctx, sub, nil
}
func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
ps.Lock()
defer ps.Unlock()
found := false
index := 0
for i, sub := range ps.subs {
if sub == s {
index = i
found = true
}
}
if found {
s.cancel()
ps.subs[index] = ps.subs[len(ps.subs)-1]
ps.subs = ps.subs[:len(ps.subs)-1]
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
}
return found
}
func (ps *Pubsub) Close() {
ps.Lock()
defer ps.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
sub.cancel()
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(sub.Data)
}
}
}
type Subsciption struct {
Data chan interface{}
cancel func()
ps *Pubsub
}
func (s *Subsciption) Unsubscribe() {
s.ps.unsubscribe(s)
}
如评论中所述,这有(至少)两个问题:
问题 1:
在 运行 实施一段时间后,我遇到了一些此类错误:
goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
/home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
/home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb
在没有真正理解这一点的情况下,在我看来 Publish()
中创建的 goroutines 执行 accumulate/leak。这是正确的吗?我做错了什么?
问题 2:
当我通过 Unsubscribe()
结束订阅时,Publish()
试图写入已关闭的频道并发生恐慌。为了缓解这种情况,我创建了一个 goroutine 来延迟关闭通道。这感觉真的不符合最佳实践,但我无法找到合适的解决方案。执行此操作的确定性方法是什么?
这里有一个供您测试的小操场:https://play.golang.org/p/K-L8vLjt7_9
在深入探讨您的解决方案及其问题之前,让我再次推荐此答案中介绍的另一种 Broker 方法:
现在开始您的解决方案。
每当您启动一个 goroutine 时,请始终考虑它将如何结束,并确保如果 goroutine 在您的应用程序的生命周期中不应该 运行。
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
这个 goroutine 试图在 ch
上发送一个值。这可能是一个阻塞操作:如果 ch
的缓冲区已满并且 ch
上没有就绪的接收器,它将阻塞。这超出了启动的 goroutine 的控制范围,也超出了 pubsub
包的控制范围。在某些情况下这可能没问题,但这已经给包的用户带来了负担。尽量避免这些。尝试创建易于使用且不易被滥用的 API。
此外,启动一个 goroutine 只是为了在通道上发送一个值是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。
你这样做是因为你不想被屏蔽。为避免阻塞,您可以使用具有“合理”高缓冲区的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“缓慢”的客户端从频道接收。
要在不启动 goroutine 的情况下“真正”避免阻塞,您可以使用非阻塞发送:
select {
case ch <- msg:
default:
// ch's buffer is full, we cannot deliver now
}
如果 ch
上的发送可以继续,它就会发生。如果不是,则立即选择 default
分支。你必须决定然后做什么。 “丢失”消息是否可以接受?等待一段时间直到“放弃”是否可以接受?或者是否可以启动一个 goroutine 来执行此操作(但随后您将回到我们正在尝试解决的问题)?或者在客户端可以从通道接收之前被阻止是否可以接受...
选择一个合理的高缓冲区,如果遇到仍然满的情况,阻塞直到客户端可以前进并接收消息是可以接受的。如果不能,那么您的整个应用程序可能处于不可接受的状态,“挂起”或“崩溃”可能是可以接受的。
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
关闭通道是向接收者发出的信号,表明通道上不会再发送任何值。因此,关闭通道始终应该是发送者的工作(和责任)。启动一个 goroutine 来关闭通道,你把那个工作和责任“交给”另一个不会同步给发送者的“实体”(一个 goroutine)。这很容易导致恐慌(在关闭的通道上发送是 运行 时间恐慌,其他公理见 )。不要那样做。
是的,这是必要的,因为您启动了要发送的 goroutines。如果你不这样做,那么你可以在不启动 goroutine 的情况下“就地”关闭,因为这样发送者和关闭者将是同一个实体:Pubsub
本身,其发送和关闭操作受到保护通过互斥锁。所以解决了第一个问题自然也就解决了第二个问题
一般情况下,如果一个频道有多个发送者,那么关闭频道必须协调。必须有一个实体(通常不是任何发送者)等待所有发送者完成,实际上使用 sync.WaitGroup
,然后该实体可以安全地关闭通道。参见 。
在项目中程序通过websocket接收数据。这个数据需要经过n种算法处理。算法数量可以动态变化。
我的尝试是创建一些 pub/sub 模式,可以在其中即时启动和取消订阅。事实证明,这比预期的更具挑战性。
这是我想出的(基于 https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/):
package pubsub
import (
"context"
"sync"
"time"
)
type Pubsub struct {
sync.RWMutex
subs []*Subsciption
closed bool
}
func New() *Pubsub {
ps := &Pubsub{}
ps.subs = []*Subsciption{}
return ps
}
func (ps *Pubsub) Publish(msg interface{}) {
ps.RLock()
defer ps.RUnlock()
if ps.closed {
return
}
for _, sub := range ps.subs {
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
}
}
func (ps *Pubsub) Subscribe() (context.Context, *Subsciption, error) {
ps.Lock()
defer ps.Unlock()
// prep channel
ctx, cancel := context.WithCancel(context.Background())
sub := &Subsciption{
Data: make(chan interface{}, 1),
cancel: cancel,
ps: ps,
}
// prep subsciption
ps.subs = append(ps.subs, sub)
return ctx, sub, nil
}
func (ps *Pubsub) unsubscribe(s *Subsciption) bool {
ps.Lock()
defer ps.Unlock()
found := false
index := 0
for i, sub := range ps.subs {
if sub == s {
index = i
found = true
}
}
if found {
s.cancel()
ps.subs[index] = ps.subs[len(ps.subs)-1]
ps.subs = ps.subs[:len(ps.subs)-1]
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
}
return found
}
func (ps *Pubsub) Close() {
ps.Lock()
defer ps.Unlock()
if !ps.closed {
ps.closed = true
for _, sub := range ps.subs {
sub.cancel()
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(sub.Data)
}
}
}
type Subsciption struct {
Data chan interface{}
cancel func()
ps *Pubsub
}
func (s *Subsciption) Unsubscribe() {
s.ps.unsubscribe(s)
}
如评论中所述,这有(至少)两个问题:
问题 1:
在 运行 实施一段时间后,我遇到了一些此类错误:
goroutine 120624 [runnable]:
bm/internal/pubsub.(*Pubsub).Publish.func1(0x8586c0, 0xc00b44e880, 0xc008617740)
/home/X/Projects/bm/internal/pubsub/pubsub.go:30
created by bookmaker/internal/pubsub.(*Pubsub).Publish
/home/X/Projects/bm/internal/pubsub/pubsub.go:30 +0xbb
在没有真正理解这一点的情况下,在我看来 Publish()
中创建的 goroutines 执行 accumulate/leak。这是正确的吗?我做错了什么?
问题 2:
当我通过 Unsubscribe()
结束订阅时,Publish()
试图写入已关闭的频道并发生恐慌。为了缓解这种情况,我创建了一个 goroutine 来延迟关闭通道。这感觉真的不符合最佳实践,但我无法找到合适的解决方案。执行此操作的确定性方法是什么?
这里有一个供您测试的小操场:https://play.golang.org/p/K-L8vLjt7_9
在深入探讨您的解决方案及其问题之前,让我再次推荐此答案中介绍的另一种 Broker 方法:
现在开始您的解决方案。
每当您启动一个 goroutine 时,请始终考虑它将如何结束,并确保如果 goroutine 在您的应用程序的生命周期中不应该 运行。
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
这个 goroutine 试图在 ch
上发送一个值。这可能是一个阻塞操作:如果 ch
的缓冲区已满并且 ch
上没有就绪的接收器,它将阻塞。这超出了启动的 goroutine 的控制范围,也超出了 pubsub
包的控制范围。在某些情况下这可能没问题,但这已经给包的用户带来了负担。尽量避免这些。尝试创建易于使用且不易被滥用的 API。
此外,启动一个 goroutine 只是为了在通道上发送一个值是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。
你这样做是因为你不想被屏蔽。为避免阻塞,您可以使用具有“合理”高缓冲区的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“缓慢”的客户端从频道接收。
要在不启动 goroutine 的情况下“真正”避免阻塞,您可以使用非阻塞发送:
select {
case ch <- msg:
default:
// ch's buffer is full, we cannot deliver now
}
如果 ch
上的发送可以继续,它就会发生。如果不是,则立即选择 default
分支。你必须决定然后做什么。 “丢失”消息是否可以接受?等待一段时间直到“放弃”是否可以接受?或者是否可以启动一个 goroutine 来执行此操作(但随后您将回到我们正在尝试解决的问题)?或者在客户端可以从通道接收之前被阻止是否可以接受...
选择一个合理的高缓冲区,如果遇到仍然满的情况,阻塞直到客户端可以前进并接收消息是可以接受的。如果不能,那么您的整个应用程序可能处于不可接受的状态,“挂起”或“崩溃”可能是可以接受的。
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
关闭通道是向接收者发出的信号,表明通道上不会再发送任何值。因此,关闭通道始终应该是发送者的工作(和责任)。启动一个 goroutine 来关闭通道,你把那个工作和责任“交给”另一个不会同步给发送者的“实体”(一个 goroutine)。这很容易导致恐慌(在关闭的通道上发送是 运行 时间恐慌,其他公理见
是的,这是必要的,因为您启动了要发送的 goroutines。如果你不这样做,那么你可以在不启动 goroutine 的情况下“就地”关闭,因为这样发送者和关闭者将是同一个实体:Pubsub
本身,其发送和关闭操作受到保护通过互斥锁。所以解决了第一个问题自然也就解决了第二个问题
一般情况下,如果一个频道有多个发送者,那么关闭频道必须协调。必须有一个实体(通常不是任何发送者)等待所有发送者完成,实际上使用 sync.WaitGroup
,然后该实体可以安全地关闭通道。参见