Go:将许多慢 API 查询引导到单个 SQL 事务中

Go: channel many slow API queries into single SQL transaction

我想知道下面的惯用方法是什么。 我有 N 个慢速 API 查询和一个数据库连接,我想有一个缓冲通道,响应将来自该通道,以及一个我将用来写入数据的数据库事务。 我只能想出以下化妆示例的信号量:

    func myFunc(){
      //10 concurrent API calls
      sem := make(chan bool, 10) 
     //A concurrent safe map as buffer
      var myMap  MyConcurrentMap 

      for i:=0;i<N;i++{
        sem<-true
        go func(i int){
          defer func(){<-sem}()
          resp:=slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
          myMap.Put(resp)
        }(i)
      }

      for j=0;j<cap(sem);j++{
        sem<-true
      }
      tx,_ := db.Begin()    
      for data:=range myMap{
       tx.Exec("Insert data into database")
      }
      tx.Commit()
}

我几乎可以肯定有更简单、更干净、更合适的解决方案,但对我来说似乎很难掌握。

编辑: 好吧,我提出了以下解决方案,这样我就不需要缓冲区映射,所以一旦数据到达 resp 通道,数据就会被打印出来或者可以用来插入到数据库中,它有效,我仍然没有确定如果一切正常,最后没有比赛。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

//Gloab waitGroup
var wg sync.WaitGroup

func init() {
    //just for fun sake, make rand seeded
    rand.Seed(time.Now().UnixNano())
}

//Emulate a slow API call
func verySlowAPI(id int) int {
    n := rand.Intn(5)
    time.Sleep(time.Duration(n) * time.Second)
    return n
}

func main() {
    //Amount of tasks
    N := 100

    //Concurrency level
    concur := 10

    //Channel for tasks
    tasks := make(chan int, N)

    //Channel for responses
    resp := make(chan int, 10)

    //10 concurrent groutinezs
    wg.Add(concur) 
    for i := 1; i <= concur; i++ {
        go worker(tasks, resp)
    }

    //Add tasks
    for i := 0; i < N; i++ {
        tasks <- i
    }

    //Collect data from goroutiens
    for i := 0; i < N; i++ {
        fmt.Printf("%d\n", <-resp)
    }

    //close the tasks channel
    close(tasks)

    //wait till finish
    wg.Wait()

}

func worker(task chan int, resp chan<- int) {
    defer wg.Done()
    for {
        task, ok := <-task
        if !ok {
            return
        }
        n := verySlowAPI(task)
        resp <- n
    }
}

不需要为信号量使用通道,sync.WaitGroup是为等待一组例程完成而创建的。

如果您使用通道来限制吞吐量,最好使用工作池,并使用通道将作业传递给工作人员:

type job struct {
    i int
}

func myFunc(N int) {
    // Adjust as needed for total number of tasks
    work := make(chan job, 10)
    // res being whatever type slowAPICall returns
    results := make(chan res, 10)
    resBuff := make([]res, 0, N)

    wg := new(sync.WaitGroup)

    // 10 concurrent API calls
    for i = 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            for j := range work {
                resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d", j.i))
                results <- resp
            }
            wg.Done()
        }()
    }

    go func() {
        for r := range results {
            resBuff = append(resBuff, r)
        }
    }

    for i = 0; i < N; i++ {
        work <- job{i}
    }
    close(work)

    wg.Wait()
    close(results)
}

也许这对您有用。现在您可以摆脱并发映射。这是一个代码片段:

func myFunc() {
    //10 concurrent API calls
    sem := make(chan bool, 10)
    respCh := make(chan YOUR_RESP_TYPE, 10)
    var responses []YOUR_RESP_TYPE

    for i := 0; i < N; i++ {
        sem <- true
        go func(i int) {
            defer func() {
                <-sem
            }()
            resp := slowAPICall(fmt.Sprintf("http://slow-api.me?%d",i))
            respCh <- resp
        }(i)
    }

    respCollected := make(chan struct{})
    go func() {
        for i := 0; i < N; i++ {
            responses = append(responses, <-respCh)
        }
        close(respCollected)
    }()

    <-respCollected
    tx,_ := db.Begin()
    for _, data := range responses {
        tx.Exec("Insert data into database")
    }
    tx.Commit()
}

我们需要再使用一个 goroutine 来收集来自响应通道的某个切片或映射中的所有响应。