在没有接收方的情况下,是否可以保留数据打开的缓冲通道?

Is it OK to leave a buffered channel with data open when there is no receiver?

假设一个频道有10个发送者和一个接收者。发送方函数需要一些时间才能获得 return 值。接收者只希望从通道中得到一个值(第一个接收到的值),其他 9 个值不用。接收方不需要等待剩下的 9 个值。这就是为什么我没有使用 sync.WaitGroup.

我用的是buffered channel,所以当receiver只取第一个时,buffered channel中会有9个数据。我的问题是:

  1. 在没有接收者的情况下,是否可以保留数据打开的缓冲通道?下面的示例代码是一个简化的代码,但是如果程序是守护进程,它最终会被垃圾回收吗?

  2. 有没有更好的方法来处理这种情况?我尝试使用取消通道但失败了。我不确定 context 是否适合这种情况。

示例代码:

package main

import (
    "errors"
    "fmt"
    "math/rand"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    i, err := getRandomInt()
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(i)
    }

    fmt.Println("Waiting goroutines to be finished...")
    time.Sleep(2 * time.Second)
}

func getRandomInt() (int, error) {
    ch := make(chan int, 10)

    // 10 senders
    for i := 0; i < 10; i++ {
        go func(i int) {
            defer fmt.Printf("Goroutine #%d finished\n", i)
            fmt.Printf("Goroutine #%d started\n", i)

            data := heavyJob()
            ch <- data
            fmt.Printf("Goroutine #%d sent data %d to ch\n", i, data)
            return
        }(i)
    }

    // 1 receiver
    timeout := time.After(2000 * time.Millisecond)
    for {
        select {
        case value := <-ch:
            // uses only the value received first, the rest are discarded
            return value, nil
        case <-timeout:
            return -1, errors.New("Timeout")
        }
    }
}

// takes 1900~2900ms to finish
func heavyJob() int {
    r := rand.Intn(1000)
    time.Sleep(time.Duration(r+1900) * time.Millisecond)
    return r
}

Run on playground

回答主要问题:

  1. 留个频道就好了,会被垃圾回收的
  2. 这似乎是基于意见,但对我来说,如果你创建一个有 10 个空格的缓冲通道的唯一原因是为了让发送者 goroutines 可以退出;感觉它会从重新设计中受益。还有其他(也许更好)的方法可以确保发送者 goroutines 可以关闭。

本质上,您正在创建一个隐式 worker 数量和缓冲通道大小之间的耦合。更改这两个数字中的一个,就会出现死锁/中断! (作为旁注,缓冲通道通常用于消费者和生产者以相同速率工作但输出不稳定的情况。它是尖锐的,缓冲平滑了波峰和波谷。)

考虑到这一点,我建议最好明确管理您不想要所有值的事实。

这是 getRandomInt() 函数的更新版本。请注意在顶部使用延迟设置上下文取消,以及在发送时使用 select 语句。

func getRandomInt() (int, error) {
    ctx := context.Background() // creates a fresh, empty context
    ctx, cancel := context.WithCancel(ctx)
    defer cancel() // cancels the context when getRandomInt() returns

    ch := make(chan int)

    // 10 senders
    for i := 0; i < 10; i++ {
        go func(i int) {
            defer fmt.Printf("Goroutine #%d finished\n", i)
            fmt.Printf("Goroutine #%d started\n", i)

            data := heavyJob()

            // this select statement wil block until either this goroutine 
            // is the first to send, or the context is cancelled. In which case
            // another routine has already sent and it can discard it's values.
            select { 
            case ch <- data:
                fmt.Printf("Goroutine #%d sent data %d to ch\n", i, data)
            case <-ctx.Done():
                fmt.Printf("Goroutine #%d did not send, context is cancelled, would have sent data %d to ch\n", i, data)
            }
        }(i)
    }

    // 1 receiver
    timeout := time.After(2000 * time.Millisecond)
    select {
    case value := <-ch:
        // uses only the value received first, the rest are discarded
        return value, nil
    case <-timeout:
        return -1, errors.New("Timeout")
    }
}

使用取消设置上下文意味着一旦调用 cancel() 函数,上下文就会变为 "Done"。这是一种告诉所有发送者 goroutines 不要费心等待发送的方法。

发送时,select 语句阻塞,直到上下文被 cancel() 函数取消;或者接收方方法读取第一个值。

我还从频道中删除了缓冲,因为不再需要它了。