在 Go 中通过多个频道广播一个频道

Broadcast a channel through multiple channel in Go

我想将从一个频道接收到的数据广播到一个频道列表。频道列表是动态的,可以在 运行 阶段修改。

作为 Go 的新开发人员,我编写了这段代码。我发现它对我想要的东西来说很重。有更好的方法吗?

package utils

import "sync"

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels
type StringChannelBroadcaster struct {
    Source      chan string
    Subscribers map[string]*StringChannelSubscriber
    stopChannel chan bool
    mutex       sync.Mutex
    capacity    uint64
}

// NewStringChannelBroadcaster creates a StringChannelBroadcaster
func NewStringChannelBroadcaster(capacity uint64) (b *StringChannelBroadcaster) {
    return &StringChannelBroadcaster{
        Source:      make(chan string, capacity),
        Subscribers: make(map[string]*StringChannelSubscriber),
        capacity:    capacity,
    }
}

// Dispatch starts dispatching message
func (b *StringChannelBroadcaster) Dispatch() {
    b.stopChannel = make(chan bool)
    for {
        select {
        case val, ok := <-b.Source:
            if ok {
                b.mutex.Lock()
                for _, value := range b.Subscribers {
                    value.Channel <- val
                }
                b.mutex.Unlock()
            }
        case <-b.stopChannel:
            return
        }
    }
}

// Stop stops the Broadcaster
func (b *StringChannelBroadcaster) Stop() {
    close(b.stopChannel)
}

// StringChannelSubscriber defines a subscriber to a StringChannelBroadcaster
type StringChannelSubscriber struct {
    Key     string
    Channel chan string
}

// NewSubscriber returns a new subsriber to the StringChannelBroadcaster
func (b *StringChannelBroadcaster) NewSubscriber() *StringChannelSubscriber {
    key := RandString(20)
    newSubscriber := StringChannelSubscriber{
        Key:     key,
        Channel: make(chan string, b.capacity),
    }
    b.mutex.Lock()
    b.Subscribers[key] = &newSubscriber
    b.mutex.Unlock()

    return &newSubscriber
}

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster
func (b *StringChannelBroadcaster) RemoveSubscriber(subscriber *StringChannelSubscriber) {
    b.mutex.Lock()
    delete(b.Subscribers, subscriber.Key)
    b.mutex.Unlock()
}

谢谢,

朱利安

我想你可以稍微简化一下:去掉 stopChannelStop 方法。您可以直接关闭 Source 而不是调用 Stop,并在 Dispatch 中检测到它(ok 将是 false)以退出(实际上您可以在源通道上进行范围)。

你可以去掉Dispatch,只在NewStringChannelBroadcaster中用for循环启动一个goroutine,这样外部代码就不必单独启动dispatch循环了。

您可以使用通道类型作为映射键,这样您的映射就可以变成 map[chan string]struct{}(空结构,因为您不需要映射值)。因此,您的 NewSubscriber 可以采用频道类型参数(或创建一个新频道并 return 它),并将其插入到地图中,您不需要随机字符串或 StringChannelSubscriber类型。

我也做了一些改进,比如关闭订阅频道:

package main

import "sync"

import (
    "fmt"
    "time"
)

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels
type StringChannelBroadcaster struct {
    Source      chan string
    Subscribers map[chan string]struct{}
    mutex       sync.Mutex
    capacity    uint64
}

// NewStringChannelBroadcaster creates a StringChannelBroadcaster
func NewStringChannelBroadcaster(capacity uint64) *StringChannelBroadcaster {
    b := &StringChannelBroadcaster{
        Source:      make(chan string, capacity),
        Subscribers: make(map[chan string]struct{}),
        capacity:    capacity,
    }
    go b.dispatch()
    return b
}

// Dispatch starts dispatching message
func (b *StringChannelBroadcaster) dispatch() {
    // for iterates until the channel is closed
    for val := range b.Source {
        b.mutex.Lock()
        for ch := range b.Subscribers {
            ch <- val
        }
        b.mutex.Unlock()
    }
    b.mutex.Lock()
    for ch := range b.Subscribers {
        close(ch)
        // you shouldn't be calling RemoveSubscriber after closing b.Source
        // but it's better to be safe than sorry
        delete(b.Subscribers, ch)
    }
    b.Subscribers = nil
    b.mutex.Unlock()
}

func (b *StringChannelBroadcaster) NewSubscriber() chan string {
    ch := make(chan string, b.capacity)
    b.mutex.Lock()
    if b.Subscribers == nil {
        panic(fmt.Errorf("NewSubscriber called on closed broadcaster"))
    }
    b.Subscribers[ch] = struct{}{}
    b.mutex.Unlock()

    return ch
}

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster
func (b *StringChannelBroadcaster) RemoveSubscriber(ch chan string) {
    b.mutex.Lock()
    if _, ok := b.Subscribers[ch]; ok {
        close(ch)                 // this line does have to be inside the if to prevent close of closed channel, in case RemoveSubscriber is called twice on the same channel
        delete(b.Subscribers, ch) // this line doesn't need to be inside the if
    }
    b.mutex.Unlock()
}

func main() {
    b := NewStringChannelBroadcaster(0)

    var toberemoved chan string

    for i := 0; i < 3; i++ {
        i := i

        ch := b.NewSubscriber()
        if i == 1 {
            toberemoved = ch
        }
        go func() {
            for v := range ch {
                fmt.Printf("receive %v: %v\n", i, v)
            }
            fmt.Printf("Exit %v\n", i)
        }()
    }

    b.Source <- "Test 1"
    b.Source <- "Test 2"
    // This is a race condition: the second reader may or may not receive the first two messages.
    b.RemoveSubscriber(toberemoved)
    b.Source <- "Test 3"

    // let the reader goroutines receive the last message
    time.Sleep(2 * time.Second)

    close(b.Source)

    // let the reader goroutines write close message
    time.Sleep(1 * time.Second)
}

https://play.golang.org/p/X-NcikvbDM

编辑:我添加了您的编辑以解决在关闭 Source 后调用 RemoveSubscriber 时出现的恐慌,但您不应该这样做,您应该让结构及其中的所有内容在通道关闭后被垃圾收集。 如果在关闭 Source 后调用它,我还向 NewSubscriber 添加了恐慌。以前你可以这样做,它会泄漏创建的通道和可能会永远阻塞在该通道上的 goroutine。

如果您可以在一个已经关闭的广播公司上调用 NewSubscriber(或 RemoveSubscriber),这可能意味着您的代码某处有错误,因为您持有一个广播公司,您不应该。