运行 永远连续最多两个go routines
Running a maximum of two go routines continuously forever
我正在尝试同时 运行 一个函数。它调用我的数据库可能需要 2-10 秒。我希望它在完成后继续执行下一个例程,即使另一个例程仍在处理中,但只希望它一次最多处理 2 个。我希望这种情况无限期地发生。我觉得我 几乎 在那里,但是 waitGroup 强制两个例程在继续另一个迭代之前等待直到完成。
const ROUTINES = 2;
for {
var wg sync.WaitGroup
_, err:= db.Exec(`Random DB Call`)
if err != nil {
panic(err)
}
ch := createRoutines(db, &wg)
wg.Add(ROUTINES)
for i := 1; i <= ROUTINES; i++ {
ch <- i
time.Sleep(2 * time.Second)
}
close(ch)
wg.Wait()
}
func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
var ch = make(chan int, 5)
for i := 0; i < ROUTINES ; i++ {
go func(db *sqlx.DB) {
defer wg.Done()
for {
_, ok := <-ch
if !ok {
return
}
doStuff(db)
}
}(db)
}
return ch
}
这增加了一个外部依赖,但考虑这个实现:
package main
import (
"context"
"database/sql"
"log"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// Create a pool of 2 workers for database queries. Log any errors.
databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
log.Printf("Failed to execute database query.\nError: %s", err.Error())
})
// Get a list of queries to execute.
queries := []string{
"SELECT first_name, last_name FROM customers",
"SELECT price FROM inventory WHERE sku='1234'",
"other queries...",
}
// TODO Make a database connection.
var db *sql.DB
for _, query := range queries {
// Intentionally shadow the looped variable for scope.
query := query
// Perform the query on a worker. If no worker is ready, it will block until one is.
databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
_, err = db.ExecContext(workCtx, query)
return err
})
}
// Wait for all workers to finish.
databasePool.Wait()
}
如果您只需要同时拥有 n 个 goroutines 运行,您可以有一个大小为 n 的缓冲通道,并在没有 space 时使用它来阻止创建新的 goroutines ] 左边是这样的
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
const ROUTINES = 2
rand.Seed(time.Now().UnixNano())
stopper := make(chan struct{}, ROUTINES)
var counter int
for {
counter++
stopper <- struct{}{}
go func(c int) {
fmt.Println("+ Starting goroutine", c)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println("- Stopping goroutine", c)
<-stopper
}(counter)
}
}
在此示例中,您将了解如何只能让 ROUTINES 数量的 goroutine 存活 0、1 或 2 秒。在输出中,您还可以看到每次一个 goroutine 结束时另一个 goroutine 是如何启动的。
我正在尝试同时 运行 一个函数。它调用我的数据库可能需要 2-10 秒。我希望它在完成后继续执行下一个例程,即使另一个例程仍在处理中,但只希望它一次最多处理 2 个。我希望这种情况无限期地发生。我觉得我 几乎 在那里,但是 waitGroup 强制两个例程在继续另一个迭代之前等待直到完成。
const ROUTINES = 2;
for {
var wg sync.WaitGroup
_, err:= db.Exec(`Random DB Call`)
if err != nil {
panic(err)
}
ch := createRoutines(db, &wg)
wg.Add(ROUTINES)
for i := 1; i <= ROUTINES; i++ {
ch <- i
time.Sleep(2 * time.Second)
}
close(ch)
wg.Wait()
}
func createRoutines(db *sqlx.DB, wg *sync.WaitGroup) chan int {
var ch = make(chan int, 5)
for i := 0; i < ROUTINES ; i++ {
go func(db *sqlx.DB) {
defer wg.Done()
for {
_, ok := <-ch
if !ok {
return
}
doStuff(db)
}
}(db)
}
return ch
}
这增加了一个外部依赖,但考虑这个实现:
package main
import (
"context"
"database/sql"
"log"
"github.com/MicahParks/ctxerrpool"
)
func main() {
// Create a pool of 2 workers for database queries. Log any errors.
databasePool := ctxerrpool.New(2, func(_ ctxerrpool.Pool, err error) {
log.Printf("Failed to execute database query.\nError: %s", err.Error())
})
// Get a list of queries to execute.
queries := []string{
"SELECT first_name, last_name FROM customers",
"SELECT price FROM inventory WHERE sku='1234'",
"other queries...",
}
// TODO Make a database connection.
var db *sql.DB
for _, query := range queries {
// Intentionally shadow the looped variable for scope.
query := query
// Perform the query on a worker. If no worker is ready, it will block until one is.
databasePool.AddWorkItem(context.TODO(), func(workCtx context.Context) (err error) {
_, err = db.ExecContext(workCtx, query)
return err
})
}
// Wait for all workers to finish.
databasePool.Wait()
}
如果您只需要同时拥有 n 个 goroutines 运行,您可以有一个大小为 n 的缓冲通道,并在没有 space 时使用它来阻止创建新的 goroutines ] 左边是这样的
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
const ROUTINES = 2
rand.Seed(time.Now().UnixNano())
stopper := make(chan struct{}, ROUTINES)
var counter int
for {
counter++
stopper <- struct{}{}
go func(c int) {
fmt.Println("+ Starting goroutine", c)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
fmt.Println("- Stopping goroutine", c)
<-stopper
}(counter)
}
}
在此示例中,您将了解如何只能让 ROUTINES 数量的 goroutine 存活 0、1 或 2 秒。在输出中,您还可以看到每次一个 goroutine 结束时另一个 goroutine 是如何启动的。