有条件地 运行 个连续的 Go 例程

Conditionally Run Consecutive Go Routines

我有以下一段代码。我正在尝试同时 运行 3 个 GO 例程,但不超过三个。这按预期工作,但代码应该是 运行ning 更新数据库中的 table。

所以第一个例程处理前 50 个,然后是第二个 50 个,然后是第三个 50 个,然后重复。我不希望两个例程同时处理相同的行,并且由于更新需要多长时间,几乎每次都会发生这种情况。

为了解决这个问题,我开始用新列 processing 标记行,这是一个布尔值。我将其设置为 true,以便在例程启动时更新所有行,并使脚本休眠 6 秒以允许更新标志。

这在随机的时间内有效,但时不时地,我会看到 2-3 个作业再次处理相同的行。我觉得我用来防止重复更新的方法有点简陋,想知道是否有更好的方法。

stopper := make(chan struct{}, 3)
var counter int
for {
    counter++
    stopper <- struct{}{}
    go func(db *sqlx.DB, c int) {
        fmt.Println("start")
        updateTables(db)
        fmt.Println("stop"b)
        <-stopper
    }(db, counter)
    time.Sleep(6 * time.Second)

}

in updateTables

var ids[]string
err := sqlx.Select(db, &data, `select * from table_data where processing = false `)
    if err != nil {
        panic(err)
    }

    for _, row:= range data{
        list = append(ids, row.Id)
    }
    if len(rows) == 0 {
        return
    }

    for _, row:= range data{
        _, err = db.Exec(`update table_data set processing = true where id = , row.Id)
        if err != nil {
            panic(err)
        }
    }
    // Additional row processing

我认为在这种情况下对 go routines 的方法存在误解。

执行此类工作的 Go 例程应该像工作线程一样处理,使用通道作为主例程(将执行同步)和工作程序 Go 例程(将执行同步)之间的通信方法实际工作)。

package main

import (
    "log"
    "sync"
    "time"
)

type record struct {
    id int
}

func main() {
    const WORKER_COUNT = 10

    recordschan := make(chan record)

    var wg sync.WaitGroup
    for k := 0; k < WORKER_COUNT; k++ {
        wg.Add(1)
        // Create the worker which will be doing the updates
        go func(workerID int) {
            defer wg.Done() // Marking the worker as done
            for record := range recordschan {
                updateRecord(record)
                log.Printf("req %d processed by worker %d", record.id, workerID)
            }
        }(k)
    }

    // Feeding the records channel
    for _, record := range fetchRecords() {
        recordschan <- record
    }

    // Closing our channel as we're not using it anymore
    close(recordschan)

    // Waiting for all the go routines to finish
    wg.Wait()

    log.Println("we're done!")
}

func fetchRecords() []record {
    result := []record{}
    for k := 0; k < 100; k++ {
        result = append(result, record{k})
    }
    return result
}

func updateRecord(req record) {
    time.Sleep(200 * time.Millisecond)
}

如果您需要一次更新所有 50 个表,您甚至可以在主 go 例程中缓冲内容。