在一定时间内从 goroutine 接收值
Receiving values from goroutine for certain amount of time
我有一个 goroutine,它可以生成无限数量的值(每个值都比最后一个更合适),但是找到每个值所需的时间越来越长。我正在尝试找到一种方法来添加时间限制,比如 10 秒,之后我的函数会执行一些迄今为止收到的最佳值。
这是我当前的 "solution",使用频道和计时器:
// the goroutine which runs infinitely
// (or at least a very long time for high values of depth)
func runSearch(depth int, ch chan int) {
for i := 1; i <= depth; i++ {
fmt.Printf("Searching to depth %v\n", i)
ch <- search(i)
}
}
// consumes progressively better values until the channel is closed
func awaitBestResult(ch chan int) {
var result int
for result := range ch {
best = result
}
// do something with best result here
}
// run both consumer and producer
func main() {
timer := time.NewTimer(time.Millisecond * 2000)
ch := make(chan int)
go runSearch(1000, ch)
go awaitBestResult(ch)
<-timer.C
close(ch)
}
这最有效 - 最好的结果是在计时器结束和频道关闭后处理。然而,我从 runSearch
goroutine 中得到一个恐慌(panic: send on closed channel
),因为主函数已经关闭了通道。
如何在计时器完成后停止第一个 goroutine 运行?非常感谢任何帮助。
您感到恐慌,因为您的发送 goroutine runSearch
显然超过了计时器并且它正试图在您的 main
goroutine 已经关闭的通道上发送一个值。您需要设计一种方法来向发送例程发出信号,一旦您的计时器失效并且在您关闭 main 中的通道之前不要发送任何值。另一方面,如果您的搜索更快结束,您还需要与 main
联系才能继续。您可以使用一个通道并进行同步,这样就没有竞争条件。最后,您需要知道您的消费者何时处理完所有数据,然后才能退出 main。
这里有些东西可能会有所帮助。
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex //To protect the stopped variable which will decide if a value is to be sent on the signalling channel
var stopped bool
func search(i int) int {
time.Sleep(1 * time.Millisecond)
return (i + 1)
}
// (or at least a very long time for high values of depth)
func runSearch(depth int, ch chan int, stopSearch chan bool) {
for i := 1; i <= depth; i++ {
fmt.Printf("Searching to depth %v\n", i)
n := search(i)
select {
case <-stopSearch:
fmt.Println("Timer over! Searched till ", i)
return
default:
}
ch <- n
fmt.Printf("Sent depth %v result for processing\n", i)
}
mu.Lock() //To avoid race condition with timer also being
//completed at the same time as execution of this code
if stopped == false {
stopped = true
stopSearch <- true
fmt.Println("Search completed")
}
mu.Unlock()
}
// consumes progressively better values until the channel is closed
func awaitBestResult(ch chan int, doneProcessing chan bool) {
var best int
for result := range ch {
best = result
}
fmt.Println("Best result ", best)
// do something with best result here
//and communicate to main when you are done processing the result
doneProcessing <- true
}
func main() {
doneProcessing := make(chan bool)
stopSearch := make(chan bool)
// timer := time.NewTimer(time.Millisecond * 2000)
timer := time.NewTimer(time.Millisecond * 12)
ch := make(chan int)
go runSearch(1000, ch, stopSearch)
go awaitBestResult(ch, doneProcessing)
select {
case <-timer.C:
//If at the same time runsearch is also completed and trying to send a value !
//So we hold a lock before sending value on the channel
mu.Lock()
if stopped == false {
stopped = true
stopSearch <- true
fmt.Println("Timer expired")
}
mu.Unlock()
case <-stopSearch:
fmt.Println("runsearch goroutine completed")
}
close(ch)
//Wait for your consumer to complete processing
<-doneProcessing
//Safe to exit now
}
在 playground。更改 timer
的值以观察这两种情况。
您需要确保 goroutine 知道它何时完成处理,这样它就不会尝试写入已关闭的通道并发生恐慌。
这听起来像是 context
包的完美案例:
func runSearch(ctx context.Context, depth int, ch chan int) {
for i := 1; i <= depth; i++ {
select {
case <- ctx.Done()
// Context cancelled, return
return
default:
}
fmt.Printf("Searching to depth %v\n", i)
ch <- search(i)
}
}
然后在main()
:
// run both consumer and producer
func main() {
ctx := context.WithTimeout(context.Background, 2 * time.Second)
ch := make(chan int)
go runSearch(ctx, 1000, ch)
go awaitBestResult(ch)
close(ch)
}
我有一个 goroutine,它可以生成无限数量的值(每个值都比最后一个更合适),但是找到每个值所需的时间越来越长。我正在尝试找到一种方法来添加时间限制,比如 10 秒,之后我的函数会执行一些迄今为止收到的最佳值。
这是我当前的 "solution",使用频道和计时器:
// the goroutine which runs infinitely
// (or at least a very long time for high values of depth)
func runSearch(depth int, ch chan int) {
for i := 1; i <= depth; i++ {
fmt.Printf("Searching to depth %v\n", i)
ch <- search(i)
}
}
// consumes progressively better values until the channel is closed
func awaitBestResult(ch chan int) {
var result int
for result := range ch {
best = result
}
// do something with best result here
}
// run both consumer and producer
func main() {
timer := time.NewTimer(time.Millisecond * 2000)
ch := make(chan int)
go runSearch(1000, ch)
go awaitBestResult(ch)
<-timer.C
close(ch)
}
这最有效 - 最好的结果是在计时器结束和频道关闭后处理。然而,我从 runSearch
goroutine 中得到一个恐慌(panic: send on closed channel
),因为主函数已经关闭了通道。
如何在计时器完成后停止第一个 goroutine 运行?非常感谢任何帮助。
您感到恐慌,因为您的发送 goroutine runSearch
显然超过了计时器并且它正试图在您的 main
goroutine 已经关闭的通道上发送一个值。您需要设计一种方法来向发送例程发出信号,一旦您的计时器失效并且在您关闭 main 中的通道之前不要发送任何值。另一方面,如果您的搜索更快结束,您还需要与 main
联系才能继续。您可以使用一个通道并进行同步,这样就没有竞争条件。最后,您需要知道您的消费者何时处理完所有数据,然后才能退出 main。
这里有些东西可能会有所帮助。
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex //To protect the stopped variable which will decide if a value is to be sent on the signalling channel
var stopped bool
func search(i int) int {
time.Sleep(1 * time.Millisecond)
return (i + 1)
}
// (or at least a very long time for high values of depth)
func runSearch(depth int, ch chan int, stopSearch chan bool) {
for i := 1; i <= depth; i++ {
fmt.Printf("Searching to depth %v\n", i)
n := search(i)
select {
case <-stopSearch:
fmt.Println("Timer over! Searched till ", i)
return
default:
}
ch <- n
fmt.Printf("Sent depth %v result for processing\n", i)
}
mu.Lock() //To avoid race condition with timer also being
//completed at the same time as execution of this code
if stopped == false {
stopped = true
stopSearch <- true
fmt.Println("Search completed")
}
mu.Unlock()
}
// consumes progressively better values until the channel is closed
func awaitBestResult(ch chan int, doneProcessing chan bool) {
var best int
for result := range ch {
best = result
}
fmt.Println("Best result ", best)
// do something with best result here
//and communicate to main when you are done processing the result
doneProcessing <- true
}
func main() {
doneProcessing := make(chan bool)
stopSearch := make(chan bool)
// timer := time.NewTimer(time.Millisecond * 2000)
timer := time.NewTimer(time.Millisecond * 12)
ch := make(chan int)
go runSearch(1000, ch, stopSearch)
go awaitBestResult(ch, doneProcessing)
select {
case <-timer.C:
//If at the same time runsearch is also completed and trying to send a value !
//So we hold a lock before sending value on the channel
mu.Lock()
if stopped == false {
stopped = true
stopSearch <- true
fmt.Println("Timer expired")
}
mu.Unlock()
case <-stopSearch:
fmt.Println("runsearch goroutine completed")
}
close(ch)
//Wait for your consumer to complete processing
<-doneProcessing
//Safe to exit now
}
在 playground。更改 timer
的值以观察这两种情况。
您需要确保 goroutine 知道它何时完成处理,这样它就不会尝试写入已关闭的通道并发生恐慌。
这听起来像是 context
包的完美案例:
func runSearch(ctx context.Context, depth int, ch chan int) {
for i := 1; i <= depth; i++ {
select {
case <- ctx.Done()
// Context cancelled, return
return
default:
}
fmt.Printf("Searching to depth %v\n", i)
ch <- search(i)
}
}
然后在main()
:
// run both consumer and producer
func main() {
ctx := context.WithTimeout(context.Background, 2 * time.Second)
ch := make(chan int)
go runSearch(ctx, 1000, ch)
go awaitBestResult(ch)
close(ch)
}