有条件地 运行 个连续的 Go 例程
Conditionally Run Consecutive Go Routines
我有以下一段代码。我正在尝试同时 运行 3 个 GO 例程,但不超过三个。这按预期工作,但代码应该是 运行ning 更新数据库中的 table。
所以第一个例程处理前 50 个,然后是第二个 50 个,然后是第三个 50 个,然后重复。我不希望两个例程同时处理相同的行,并且由于更新需要多长时间,几乎每次都会发生这种情况。
为了解决这个问题,我开始用新列 processing
标记行,这是一个布尔值。我将其设置为 true,以便在例程启动时更新所有行,并使脚本休眠 6 秒以允许更新标志。
这在随机的时间内有效,但时不时地,我会看到 2-3 个作业再次处理相同的行。我觉得我用来防止重复更新的方法有点简陋,想知道是否有更好的方法。
stopper := make(chan struct{}, 3)
var counter int
for {
counter++
stopper <- struct{}{}
go func(db *sqlx.DB, c int) {
fmt.Println("start")
updateTables(db)
fmt.Println("stop"b)
<-stopper
}(db, counter)
time.Sleep(6 * time.Second)
}
in updateTables
var ids[]string
err := sqlx.Select(db, &data, `select * from table_data where processing = false `)
if err != nil {
panic(err)
}
for _, row:= range data{
list = append(ids, row.Id)
}
if len(rows) == 0 {
return
}
for _, row:= range data{
_, err = db.Exec(`update table_data set processing = true where id = , row.Id)
if err != nil {
panic(err)
}
}
// Additional row processing
我认为在这种情况下对 go routines 的方法存在误解。
执行此类工作的 Go 例程应该像工作线程一样处理,使用通道作为主例程(将执行同步)和工作程序 Go 例程(将执行同步)之间的通信方法实际工作)。
package main
import (
"log"
"sync"
"time"
)
type record struct {
id int
}
func main() {
const WORKER_COUNT = 10
recordschan := make(chan record)
var wg sync.WaitGroup
for k := 0; k < WORKER_COUNT; k++ {
wg.Add(1)
// Create the worker which will be doing the updates
go func(workerID int) {
defer wg.Done() // Marking the worker as done
for record := range recordschan {
updateRecord(record)
log.Printf("req %d processed by worker %d", record.id, workerID)
}
}(k)
}
// Feeding the records channel
for _, record := range fetchRecords() {
recordschan <- record
}
// Closing our channel as we're not using it anymore
close(recordschan)
// Waiting for all the go routines to finish
wg.Wait()
log.Println("we're done!")
}
func fetchRecords() []record {
result := []record{}
for k := 0; k < 100; k++ {
result = append(result, record{k})
}
return result
}
func updateRecord(req record) {
time.Sleep(200 * time.Millisecond)
}
如果您需要一次更新所有 50 个表,您甚至可以在主 go 例程中缓冲内容。
我有以下一段代码。我正在尝试同时 运行 3 个 GO 例程,但不超过三个。这按预期工作,但代码应该是 运行ning 更新数据库中的 table。
所以第一个例程处理前 50 个,然后是第二个 50 个,然后是第三个 50 个,然后重复。我不希望两个例程同时处理相同的行,并且由于更新需要多长时间,几乎每次都会发生这种情况。
为了解决这个问题,我开始用新列 processing
标记行,这是一个布尔值。我将其设置为 true,以便在例程启动时更新所有行,并使脚本休眠 6 秒以允许更新标志。
这在随机的时间内有效,但时不时地,我会看到 2-3 个作业再次处理相同的行。我觉得我用来防止重复更新的方法有点简陋,想知道是否有更好的方法。
stopper := make(chan struct{}, 3)
var counter int
for {
counter++
stopper <- struct{}{}
go func(db *sqlx.DB, c int) {
fmt.Println("start")
updateTables(db)
fmt.Println("stop"b)
<-stopper
}(db, counter)
time.Sleep(6 * time.Second)
}
in updateTables
var ids[]string
err := sqlx.Select(db, &data, `select * from table_data where processing = false `)
if err != nil {
panic(err)
}
for _, row:= range data{
list = append(ids, row.Id)
}
if len(rows) == 0 {
return
}
for _, row:= range data{
_, err = db.Exec(`update table_data set processing = true where id = , row.Id)
if err != nil {
panic(err)
}
}
// Additional row processing
我认为在这种情况下对 go routines 的方法存在误解。
执行此类工作的 Go 例程应该像工作线程一样处理,使用通道作为主例程(将执行同步)和工作程序 Go 例程(将执行同步)之间的通信方法实际工作)。
package main
import (
"log"
"sync"
"time"
)
type record struct {
id int
}
func main() {
const WORKER_COUNT = 10
recordschan := make(chan record)
var wg sync.WaitGroup
for k := 0; k < WORKER_COUNT; k++ {
wg.Add(1)
// Create the worker which will be doing the updates
go func(workerID int) {
defer wg.Done() // Marking the worker as done
for record := range recordschan {
updateRecord(record)
log.Printf("req %d processed by worker %d", record.id, workerID)
}
}(k)
}
// Feeding the records channel
for _, record := range fetchRecords() {
recordschan <- record
}
// Closing our channel as we're not using it anymore
close(recordschan)
// Waiting for all the go routines to finish
wg.Wait()
log.Println("we're done!")
}
func fetchRecords() []record {
result := []record{}
for k := 0; k < 100; k++ {
result = append(result, record{k})
}
return result
}
func updateRecord(req record) {
time.Sleep(200 * time.Millisecond)
}
如果您需要一次更新所有 50 个表,您甚至可以在主 go 例程中缓冲内容。