执行 routine:Making 并发 API 请求

Go routine:Making concurrent API requests

我正在尝试了解通道和 goroutines,并尝试编写一个 goroutine 来向服务器发出并发 API 请求

但是当我 运行 使用 goroutine 的代码时,它似乎花费了与没有 goroutine 相同的时间。

func sendUser(user string, ch chan<- string)  {
    resp,err := http.get("URL"/user)
    //do the processing and get resp=string
    ch <- resp
}


func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string

    for _ , user = range users {
        go sendUser(user, ch)

        for {
            select {
            case r := <-ch:
                if r.err != nil {
                    fmt.Println(r.err)
                }
                responses = append(responses, r)
                **//Is there a better way to show that the processing of response is complete**?
                if len(responses) == len(users) { 
                    return responses, nil
                }
            case <-time.After(50 * time.Millisecond):
                fmt.Printf(".")
            }
        }
    }
    return responses, nil
}

问题:

  1. 即使我使用了 goroutine,请求完成时间与没有 goroutine 时一样吗?我对 goroutine 做错了什么吗?

  2. 为了告诉工作不要再在这里等待我正在使用:

    if len(responses) == len(users)
    

    有没有更好的方法来显示response的处理完成,告诉ch不要再等了?

  3. 什么是wait.Syncgroup?我如何在我的 goroutine 中使用它?

我可能会做这样的事情..

func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get("URL/" + user)
    if err != nil {
        log.Println("err handle it")
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Println("err handle it")
    }
    ch <- string(b)
}

func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string
    var wg sync.WaitGroup
    for _, user = range users {
        wg.Add(1)
        go sendUser(user, ch, &wg)
    }

    // close the channel in the background
    go func() {
        wg.Wait()
        close(ch)
    }()
    // read from channel as they come in until its closed
    for res := range ch {
        responses = append(responses, res)
    }

    return responses, nil
}

它允许在发送时从频道中读取。通过使用等待组,我将知道何时关闭通道。通过将 waitgroup 和 close 放在一个 goroutine 中,我可以从 "realtime" 中的通道读取而不会阻塞。

对于有界并行/速率限制,我们可以看一个例子 https://blog.golang.org/pipelines#TOC_9.

基本上步骤是:

  1. 将用于调用 API 的输入/参数/参数流式传输到输入通道。
  2. 运行 N 个 worker goroutine,每个都使用相同的(共享的)输入通道。从输入通道获取参数,调用 API,将结果发送到结果通道。
  3. 使用结果通道,return有错误尽早

sync.WaitGroup用于等待所有worker goroutine完成(输入通道耗尽后)。

下面是它的代码示例(您可以 运行 马上,尝试将 NUM_PARALLEL 更改为不同的并行数)。将 BASE_URL 更改为您的基础 url.

package main

import (
    "fmt"
    "io"
    "net/http"
    "strconv"
    "sync"
    "time"
)

// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"

// number of parallelism
const NUM_PARALLEL = 20

// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
    inputCh := make(chan string)
    go func() {
        defer close(inputCh)
        for _, input := range inputs {
            select {
            case inputCh <- input:
            case <-done:
                // in case done is closed prematurely (because error midway),
                // finish the loop (closing input channel)
                break
            }
        }
    }()
    return inputCh
}

// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
    url := BASE_URL + user
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }

    bodyStr := string(body)
    return bodyStr, nil
}

// Wrapper for sendUser return value, used as result channel type
type result struct {
    bodyStr string
    err     error
}

func AsyncHTTP(users []string) ([]string, error) {
    done := make(chan struct{})
    defer close(done)

    inputCh := streamInputs(done, users)

    var wg sync.WaitGroup
    // bulk add goroutine counter at the start
    wg.Add(NUM_PARALLEL)

    resultCh := make(chan result)

    for i := 0; i < NUM_PARALLEL; i++ {
        // spawn N worker goroutines, each is consuming a shared input channel.
        go func() {
            for input := range inputCh {
                bodyStr, err := sendUser(input)
                resultCh <- result{bodyStr, err}
            }
            wg.Done()
        }()
    }

    // Wait all worker goroutines to finish. Happens if there's no error (no early return)
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    results := []string{}
    for result := range resultCh {
        if result.err != nil {
            // return early. done channel is closed, thus input channel is also closed.
            // all worker goroutines stop working (because input channel is closed)
            return nil, result.err
        }
        results = append(results, result.bodyStr)
    }

    return results, nil
}

func main() {
    // populate users param
    users := []string{}
    for i := 1; i <= 100; i++ {
        users = append(users, strconv.Itoa(i))
    }

    start := time.Now()

    results, err := AsyncHTTP(users)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, result := range results {
        fmt.Println(result)
    }

    fmt.Println("finished in ", time.Since(start))
}