当通道关闭时,以接收通道作为参数的 goroutines 是否停止?

Do goroutines with receiving channel as parameter stop, when the channel is closed?

我一直在阅读 "Building microservices with go",这本书介绍了 apache/go-resiliency/deadline 处理超时的包。

deadline.go

// Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
package deadline

import (
    "errors"
    "time"
)

// ErrTimedOut is the error returned from Run when the deadline expires.
var ErrTimedOut = errors.New("timed out waiting for function to finish")

// Deadline implements the deadline/timeout resiliency pattern.
type Deadline struct {
    timeout time.Duration
}

// New constructs a new Deadline with the given timeout.
func New(timeout time.Duration) *Deadline {
    return &Deadline{
        timeout: timeout,
    }
}

// Run runs the given function, passing it a stopper channel. If the deadline passes before
// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper
// channel so that the work function can attempt to exit gracefully. It does not (and cannot)
// simply kill the running function, so if it doesn't respect the stopper channel then it may
// keep running after the deadline passes. If the function finishes before the deadline, then
// the return value of the function is returned from Run.
func (d *Deadline) Run(work func(<-chan struct{}) error) error {
    result := make(chan error)
    stopper := make(chan struct{})

    go func() {
        result <- work(stopper)
    }()

    select {
    case ret := <-result:
        return ret
    case <-time.After(d.timeout):
        close(stopper)
        return ErrTimedOut
    }
}

deadline_test.go

package deadline

import (
    "errors"
    "testing"
    "time"
)

func takesFiveMillis(stopper <-chan struct{}) error {
    time.Sleep(5 * time.Millisecond)
    return nil
}

func takesTwentyMillis(stopper <-chan struct{}) error {
    time.Sleep(20 * time.Millisecond)
    return nil
}

func returnsError(stopper <-chan struct{}) error {
    return errors.New("foo")
}

func TestDeadline(t *testing.T) {
    dl := New(10 * time.Millisecond)

    if err := dl.Run(takesFiveMillis); err != nil {
        t.Error(err)
    }

    if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
        t.Error(err)
    }

    if err := dl.Run(returnsError); err.Error() != "foo" {
        t.Error(err)
    }

    done := make(chan struct{})
    err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
        return nil
    })
    if err != ErrTimedOut {
        t.Error(err)
    }
    <-done
}

func ExampleDeadline() {
    dl := New(1 * time.Second)

    err := dl.Run(func(stopper <-chan struct{}) error {
        // do something possibly slow
        // check stopper function and give up if timed out
        return nil
    })

    switch err {
    case ErrTimedOut:
        // execution took too long, oops
    default:
        // some other error
    }
}

第一个问题

// in deadline_test.go
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
    t.Error(err)
}  

我无法理解上述代码的执行流程。据我了解,由于takesTwentyMillis函数休眠的时间比设置的超时时间10毫秒长,

// in deadline.go
case <-time.After(d.timeout):
    close(stopper)
    return ErrTimedOut

time.After 发出当前时间,并且选择了这种情况。然后停止器通道关闭,ErrTimeout 为 returned.

我不明白的是,关闭 stopper 通道对可能仍然是 运行ning[=70 的匿名 goroutine 做了什么=] 我认为,当 stopper 通道关闭时,下面的 goroutine 可能仍然是 运行ning.

go func() {
        result <- work(stopper)
    }()

(这里说错了请指正)我想在close(stopper)之后,这个goroutine会调用takesTwentyMillis(=work function),参数是stopper channel。该函数将继续并休眠 20 毫秒,然后 return nil 传递给结果通道。 main() 到此结束,对吧?

我不明白在这里关闭塞子通道有什么意义。 takesTwentyMillis 函数似乎并没有在函数体内使用通道:(.

第二题

// in deadline_test.go within TestDeadline()
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
    <-stopper
    close(done)
    return nil
})
if err != ErrTimedOut {
    t.Error(err)
}
<-done

这是我不完全理解的部分。我认为当 dl.Run 是 运行 时,停止通道被初始化。但是因为 stopper 通道中没有值,函数调用将被阻塞在 <-stopper...但是因为我不理解这段代码,所以我不明白为什么这段代码首先存在(即什么此代码正在尝试测试,以及它是如何执行的,等等)。


关于第二个问题的第三个(附加)问题

所以我理解当第二个问题中的Run函数触发stopper channel关闭时,worker函数得到了信号。然后工作人员关闭完成的通道并且 returns 为零。 我使用 delve(=go debugger) 来查看这一点,gdb 将我带到行 return nil 之后的 deadline.go 中的 goroutine。

   err := dl.Run(func(stopper <-chan struct{}) error {
        <-stopper
        close(done)
-->     return nil   
    })

输入 n 跳到下一行后,delve 带我到这里

    go func() {
-->             result <- work(stopper)
            }()

过程在这里结束了,因为当我再次输入 n 时,命令行提示 PASS 并且过程退出。为什么这个过程在这里结束? work(stopper) 似乎是 return nil,然后应该将其传递给结果通道,对吗?但是由于某些原因,这一行似乎没有执行。

我知道主要的 goroutine,也就是 Run 函数,已经 returned ErrTimedOut。所以我想这与此有关?

第一题

使用 stopper 通道是为了发出信号,例如takesTwentyMillis 截止日期已到,调用者不再关心其结果。通常这意味着像 takesTwentyMillis 这样的工作函数应该检查 stopper 通道是否已经关闭,以便它可以取消它的工作。不过,检查 stopper 通道是工作函数的选择。它可能会或可能不会检查频道。

func takesTwentyMillis(stopper <-chan struct{}) error {
    for i := 0; i < 20; i++ {
        select {
        case <-stopper:
            // caller doesn't care anymore might as well stop working
            return nil
        case <-time.After(time.Second): // simulating work
        }
    }
    // work is done
    return nil
}

第二题

这部分Deadline.Run()将关闭塞子通道

case <-time.After(d.timeout):
    close(stopper)

在关闭的频道 (<-stopper) 上读取将立即 return 该频道的零值。我认为它只是在测试最终超时的工作函数。