当通道关闭时,以接收通道作为参数的 goroutines 是否停止?
Do goroutines with receiving channel as parameter stop, when the channel is closed?
我一直在阅读 "Building microservices with go",这本书介绍了 apache/go-resiliency/deadline
处理超时的包。
deadline.go
// Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
package deadline
import (
"errors"
"time"
)
// ErrTimedOut is the error returned from Run when the deadline expires.
var ErrTimedOut = errors.New("timed out waiting for function to finish")
// Deadline implements the deadline/timeout resiliency pattern.
type Deadline struct {
timeout time.Duration
}
// New constructs a new Deadline with the given timeout.
func New(timeout time.Duration) *Deadline {
return &Deadline{
timeout: timeout,
}
}
// Run runs the given function, passing it a stopper channel. If the deadline passes before
// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper
// channel so that the work function can attempt to exit gracefully. It does not (and cannot)
// simply kill the running function, so if it doesn't respect the stopper channel then it may
// keep running after the deadline passes. If the function finishes before the deadline, then
// the return value of the function is returned from Run.
func (d *Deadline) Run(work func(<-chan struct{}) error) error {
result := make(chan error)
stopper := make(chan struct{})
go func() {
result <- work(stopper)
}()
select {
case ret := <-result:
return ret
case <-time.After(d.timeout):
close(stopper)
return ErrTimedOut
}
}
deadline_test.go
package deadline
import (
"errors"
"testing"
"time"
)
func takesFiveMillis(stopper <-chan struct{}) error {
time.Sleep(5 * time.Millisecond)
return nil
}
func takesTwentyMillis(stopper <-chan struct{}) error {
time.Sleep(20 * time.Millisecond)
return nil
}
func returnsError(stopper <-chan struct{}) error {
return errors.New("foo")
}
func TestDeadline(t *testing.T) {
dl := New(10 * time.Millisecond)
if err := dl.Run(takesFiveMillis); err != nil {
t.Error(err)
}
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
t.Error(err)
}
if err := dl.Run(returnsError); err.Error() != "foo" {
t.Error(err)
}
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
return nil
})
if err != ErrTimedOut {
t.Error(err)
}
<-done
}
func ExampleDeadline() {
dl := New(1 * time.Second)
err := dl.Run(func(stopper <-chan struct{}) error {
// do something possibly slow
// check stopper function and give up if timed out
return nil
})
switch err {
case ErrTimedOut:
// execution took too long, oops
default:
// some other error
}
}
第一个问题
// in deadline_test.go
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
t.Error(err)
}
我无法理解上述代码的执行流程。据我了解,由于takesTwentyMillis
函数休眠的时间比设置的超时时间10毫秒长,
// in deadline.go
case <-time.After(d.timeout):
close(stopper)
return ErrTimedOut
time.After 发出当前时间,并且选择了这种情况。然后停止器通道关闭,ErrTimeout 为 returned.
我不明白的是,关闭 stopper 通道对可能仍然是 运行ning[=70 的匿名 goroutine 做了什么=]
我认为,当 stopper 通道关闭时,下面的 goroutine 可能仍然是 运行ning.
go func() {
result <- work(stopper)
}()
(这里说错了请指正)我想在close(stopper)
之后,这个goroutine会调用takesTwentyMillis
(=work function),参数是stopper channel。该函数将继续并休眠 20 毫秒,然后 return nil 传递给结果通道。 main() 到此结束,对吧?
我不明白在这里关闭塞子通道有什么意义。 takesTwentyMillis
函数似乎并没有在函数体内使用通道:(.
第二题
// in deadline_test.go within TestDeadline()
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
return nil
})
if err != ErrTimedOut {
t.Error(err)
}
<-done
这是我不完全理解的部分。我认为当 dl.Run
是 运行 时,停止通道被初始化。但是因为 stopper 通道中没有值,函数调用将被阻塞在 <-stopper
...但是因为我不理解这段代码,所以我不明白为什么这段代码首先存在(即什么此代码正在尝试测试,以及它是如何执行的,等等)。
关于第二个问题的第三个(附加)问题
所以我理解当第二个问题中的Run
函数触发stopper channel关闭时,worker函数得到了信号。然后工作人员关闭完成的通道并且 returns 为零。
我使用 delve(=go debugger) 来查看这一点,gdb 将我带到行 return nil
之后的 deadline.go
中的 goroutine。
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
--> return nil
})
输入 n 跳到下一行后,delve 带我到这里
go func() {
--> result <- work(stopper)
}()
过程在这里结束了,因为当我再次输入 n 时,命令行提示 PASS 并且过程退出。为什么这个过程在这里结束? work(stopper)
似乎是 return nil
,然后应该将其传递给结果通道,对吗?但是由于某些原因,这一行似乎没有执行。
我知道主要的 goroutine,也就是 Run
函数,已经 returned ErrTimedOut。所以我想这与此有关?
第一题
使用 stopper
通道是为了发出信号,例如takesTwentyMillis
截止日期已到,调用者不再关心其结果。通常这意味着像 takesTwentyMillis
这样的工作函数应该检查 stopper
通道是否已经关闭,以便它可以取消它的工作。不过,检查 stopper
通道是工作函数的选择。它可能会或可能不会检查频道。
func takesTwentyMillis(stopper <-chan struct{}) error {
for i := 0; i < 20; i++ {
select {
case <-stopper:
// caller doesn't care anymore might as well stop working
return nil
case <-time.After(time.Second): // simulating work
}
}
// work is done
return nil
}
第二题
这部分Deadline.Run()
将关闭塞子通道
case <-time.After(d.timeout):
close(stopper)
在关闭的频道 (<-stopper
) 上读取将立即 return 该频道的零值。我认为它只是在测试最终超时的工作函数。
我一直在阅读 "Building microservices with go",这本书介绍了 apache/go-resiliency/deadline
处理超时的包。
deadline.go
// Package deadline implements the deadline (also known as "timeout") resiliency pattern for Go.
package deadline
import (
"errors"
"time"
)
// ErrTimedOut is the error returned from Run when the deadline expires.
var ErrTimedOut = errors.New("timed out waiting for function to finish")
// Deadline implements the deadline/timeout resiliency pattern.
type Deadline struct {
timeout time.Duration
}
// New constructs a new Deadline with the given timeout.
func New(timeout time.Duration) *Deadline {
return &Deadline{
timeout: timeout,
}
}
// Run runs the given function, passing it a stopper channel. If the deadline passes before
// the function finishes executing, Run returns ErrTimeOut to the caller and closes the stopper
// channel so that the work function can attempt to exit gracefully. It does not (and cannot)
// simply kill the running function, so if it doesn't respect the stopper channel then it may
// keep running after the deadline passes. If the function finishes before the deadline, then
// the return value of the function is returned from Run.
func (d *Deadline) Run(work func(<-chan struct{}) error) error {
result := make(chan error)
stopper := make(chan struct{})
go func() {
result <- work(stopper)
}()
select {
case ret := <-result:
return ret
case <-time.After(d.timeout):
close(stopper)
return ErrTimedOut
}
}
deadline_test.go
package deadline
import (
"errors"
"testing"
"time"
)
func takesFiveMillis(stopper <-chan struct{}) error {
time.Sleep(5 * time.Millisecond)
return nil
}
func takesTwentyMillis(stopper <-chan struct{}) error {
time.Sleep(20 * time.Millisecond)
return nil
}
func returnsError(stopper <-chan struct{}) error {
return errors.New("foo")
}
func TestDeadline(t *testing.T) {
dl := New(10 * time.Millisecond)
if err := dl.Run(takesFiveMillis); err != nil {
t.Error(err)
}
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
t.Error(err)
}
if err := dl.Run(returnsError); err.Error() != "foo" {
t.Error(err)
}
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
return nil
})
if err != ErrTimedOut {
t.Error(err)
}
<-done
}
func ExampleDeadline() {
dl := New(1 * time.Second)
err := dl.Run(func(stopper <-chan struct{}) error {
// do something possibly slow
// check stopper function and give up if timed out
return nil
})
switch err {
case ErrTimedOut:
// execution took too long, oops
default:
// some other error
}
}
第一个问题
// in deadline_test.go
if err := dl.Run(takesTwentyMillis); err != ErrTimedOut {
t.Error(err)
}
我无法理解上述代码的执行流程。据我了解,由于takesTwentyMillis
函数休眠的时间比设置的超时时间10毫秒长,
// in deadline.go
case <-time.After(d.timeout):
close(stopper)
return ErrTimedOut
time.After 发出当前时间,并且选择了这种情况。然后停止器通道关闭,ErrTimeout 为 returned.
我不明白的是,关闭 stopper 通道对可能仍然是 运行ning[=70 的匿名 goroutine 做了什么=] 我认为,当 stopper 通道关闭时,下面的 goroutine 可能仍然是 运行ning.
go func() {
result <- work(stopper)
}()
(这里说错了请指正)我想在close(stopper)
之后,这个goroutine会调用takesTwentyMillis
(=work function),参数是stopper channel。该函数将继续并休眠 20 毫秒,然后 return nil 传递给结果通道。 main() 到此结束,对吧?
我不明白在这里关闭塞子通道有什么意义。 takesTwentyMillis
函数似乎并没有在函数体内使用通道:(.
第二题
// in deadline_test.go within TestDeadline()
done := make(chan struct{})
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
return nil
})
if err != ErrTimedOut {
t.Error(err)
}
<-done
这是我不完全理解的部分。我认为当 dl.Run
是 运行 时,停止通道被初始化。但是因为 stopper 通道中没有值,函数调用将被阻塞在 <-stopper
...但是因为我不理解这段代码,所以我不明白为什么这段代码首先存在(即什么此代码正在尝试测试,以及它是如何执行的,等等)。
关于第二个问题的第三个(附加)问题
所以我理解当第二个问题中的Run
函数触发stopper channel关闭时,worker函数得到了信号。然后工作人员关闭完成的通道并且 returns 为零。
我使用 delve(=go debugger) 来查看这一点,gdb 将我带到行 return nil
之后的 deadline.go
中的 goroutine。
err := dl.Run(func(stopper <-chan struct{}) error {
<-stopper
close(done)
--> return nil
})
输入 n 跳到下一行后,delve 带我到这里
go func() {
--> result <- work(stopper)
}()
过程在这里结束了,因为当我再次输入 n 时,命令行提示 PASS 并且过程退出。为什么这个过程在这里结束? work(stopper)
似乎是 return nil
,然后应该将其传递给结果通道,对吗?但是由于某些原因,这一行似乎没有执行。
我知道主要的 goroutine,也就是 Run
函数,已经 returned ErrTimedOut。所以我想这与此有关?
第一题
使用 stopper
通道是为了发出信号,例如takesTwentyMillis
截止日期已到,调用者不再关心其结果。通常这意味着像 takesTwentyMillis
这样的工作函数应该检查 stopper
通道是否已经关闭,以便它可以取消它的工作。不过,检查 stopper
通道是工作函数的选择。它可能会或可能不会检查频道。
func takesTwentyMillis(stopper <-chan struct{}) error {
for i := 0; i < 20; i++ {
select {
case <-stopper:
// caller doesn't care anymore might as well stop working
return nil
case <-time.After(time.Second): // simulating work
}
}
// work is done
return nil
}
第二题
这部分Deadline.Run()
将关闭塞子通道
case <-time.After(d.timeout):
close(stopper)
在关闭的频道 (<-stopper
) 上读取将立即 return 该频道的零值。我认为它只是在测试最终超时的工作函数。