带通道的 WaitGroup

WaitGroup with Channels

我一直在研究 this 并想出了:

type Function struct{
    Function func(*TaskGroup, []interface{})
    Args []interface{}
}

type TaskGroup struct{
    Group sync.WaitGroup
    Functions []Function
}

func (x *TaskGroup) Start() {
    for _, Function := range x.Functions{
        x.Group.Add(1)
        go Function.Function(x, Function.Args)
    }
    x.Group.Wait()
}

为了更轻松地使用多个函数,我必须等待。

以下测试将起作用,但我不明白为什么:

func auxC(x *TaskGroup, args []interface{}){
    defer x.Group.Done()
    messageOut := args[0].(chan string)
    messageOut <- "TestC"
}
func auxD(x *TaskGroup, args []interface{}){
    defer x.Group.Done()
    messageOut := args[0].(chan string)
    messageOut <- "TestD"
}

func TestTaskGroupBaseB(t *testing.T) {
    messageC := make(chan string, 1)
    messageD := make(chan string, 1)

    tg := TaskGroup{
        Functions: []Function{
            {auxC, []interface{}{messageC}},
            {auxD, []interface{}{messageD}},
        },
    }
    tg.Start()

    fmt.Println(<- messageC)
    fmt.Println(<- messageD)

    time.Sleep(100 * time.Millisecond)
}

我第一次尝试使用这样的无缓冲通道:

messageC := make(chan string)
messageD := make(chan string)

但它不起作用,它会永远卡住,什么也不做,所以我有几个问题:

  1. 为什么大小为 1 的缓冲通道有效,而无缓冲的通道无效?
  2. 默认大小 1 不是无缓冲的吗?

重构代码,见评论:

Main/Tests:

func auxC(args []interface{}){
    messageOut := args[0].(chan string)
    messageOut <- "TestC"
}
func auxD(args []interface{}){
    messageOut := args[0].(chan string)
    messageOut <- "TestD"
}

func TestTaskGroupBaseB(t *testing.T) {
    messageC := make(chan string,1)
    messageD := make(chan string,1)

    tg := TaskGroup{
        Functions: []Function{
            {auxC, []interface{}{messageC}},
            {auxD, []interface{}{messageD}},
        },
    }
    tg.Wait()

    fmt.Println(<- messageC)
    fmt.Println(<- messageD)

    time.Sleep(100 * time.Millisecond)
}

任务组:

type Function struct{
    Function func([]interface{})
    Args []interface{}
}

type TaskGroup struct{
    Group sync.WaitGroup
    Functions []Function
}

func (x *TaskGroup) Wait() {
    for _, function := range x.Functions{
        x.Group.Add(1)
        go func(x *TaskGroup, f Function){
            defer x.Group.Done()
            f.Function(f.Args)
        }(x, function)
    }
    x.Group.Wait()
}

使用缓冲区大小为1的通道,首先写入缓冲区数据,然后goroutines结束,你可以在主goroutine中读取缓冲数据。

当通道大小为零时,对通道的写入会阻塞,直到另一个 goroutine 从中读取。所以你的两个 goroutines 都在等待写入通道。如果您在通道读入 main 后移动 Wait() 调用,它应该可以工作。