执行 routine:Making 并发 API 请求
Go routine:Making concurrent API requests
我正在尝试了解通道和 goroutines,并尝试编写一个 goroutine 来向服务器发出并发 API 请求
但是当我 运行 使用 goroutine 的代码时,它似乎花费了与没有 goroutine 相同的时间。
func sendUser(user string, ch chan<- string) {
resp,err := http.get("URL"/user)
//do the processing and get resp=string
ch <- resp
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
for _ , user = range users {
go sendUser(user, ch)
for {
select {
case r := <-ch:
if r.err != nil {
fmt.Println(r.err)
}
responses = append(responses, r)
**//Is there a better way to show that the processing of response is complete**?
if len(responses) == len(users) {
return responses, nil
}
case <-time.After(50 * time.Millisecond):
fmt.Printf(".")
}
}
}
return responses, nil
}
问题:
即使我使用了 goroutine,请求完成时间与没有 goroutine 时一样吗?我对 goroutine 做错了什么吗?
为了告诉工作不要再在这里等待我正在使用:
if len(responses) == len(users)
有没有更好的方法来显示response的处理完成,告诉ch不要再等了?
什么是wait.Syncgroup?我如何在我的 goroutine 中使用它?
我可能会做这样的事情..
func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("URL/" + user)
if err != nil {
log.Println("err handle it")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("err handle it")
}
ch <- string(b)
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
var wg sync.WaitGroup
for _, user = range users {
wg.Add(1)
go sendUser(user, ch, &wg)
}
// close the channel in the background
go func() {
wg.Wait()
close(ch)
}()
// read from channel as they come in until its closed
for res := range ch {
responses = append(responses, res)
}
return responses, nil
}
它允许在发送时从频道中读取。通过使用等待组,我将知道何时关闭通道。通过将 waitgroup 和 close 放在一个 goroutine 中,我可以从 "realtime" 中的通道读取而不会阻塞。
对于有界并行/速率限制,我们可以看一个例子 https://blog.golang.org/pipelines#TOC_9.
基本上步骤是:
- 将用于调用 API 的输入/参数/参数流式传输到输入通道。
- 运行
N
个 worker goroutine,每个都使用相同的(共享的)输入通道。从输入通道获取参数,调用 API,将结果发送到结果通道。
- 使用结果通道,return有错误尽早
sync.WaitGroup
用于等待所有worker goroutine完成(输入通道耗尽后)。
下面是它的代码示例(您可以 运行 马上,尝试将 NUM_PARALLEL
更改为不同的并行数)。将 BASE_URL
更改为您的基础 url.
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
)
// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"
// number of parallelism
const NUM_PARALLEL = 20
// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
inputCh := make(chan string)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
// in case done is closed prematurely (because error midway),
// finish the loop (closing input channel)
break
}
}
}()
return inputCh
}
// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
url := BASE_URL + user
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
bodyStr := string(body)
return bodyStr, nil
}
// Wrapper for sendUser return value, used as result channel type
type result struct {
bodyStr string
err error
}
func AsyncHTTP(users []string) ([]string, error) {
done := make(chan struct{})
defer close(done)
inputCh := streamInputs(done, users)
var wg sync.WaitGroup
// bulk add goroutine counter at the start
wg.Add(NUM_PARALLEL)
resultCh := make(chan result)
for i := 0; i < NUM_PARALLEL; i++ {
// spawn N worker goroutines, each is consuming a shared input channel.
go func() {
for input := range inputCh {
bodyStr, err := sendUser(input)
resultCh <- result{bodyStr, err}
}
wg.Done()
}()
}
// Wait all worker goroutines to finish. Happens if there's no error (no early return)
go func() {
wg.Wait()
close(resultCh)
}()
results := []string{}
for result := range resultCh {
if result.err != nil {
// return early. done channel is closed, thus input channel is also closed.
// all worker goroutines stop working (because input channel is closed)
return nil, result.err
}
results = append(results, result.bodyStr)
}
return results, nil
}
func main() {
// populate users param
users := []string{}
for i := 1; i <= 100; i++ {
users = append(users, strconv.Itoa(i))
}
start := time.Now()
results, err := AsyncHTTP(users)
if err != nil {
fmt.Println(err)
return
}
for _, result := range results {
fmt.Println(result)
}
fmt.Println("finished in ", time.Since(start))
}
我正在尝试了解通道和 goroutines,并尝试编写一个 goroutine 来向服务器发出并发 API 请求
但是当我 运行 使用 goroutine 的代码时,它似乎花费了与没有 goroutine 相同的时间。
func sendUser(user string, ch chan<- string) {
resp,err := http.get("URL"/user)
//do the processing and get resp=string
ch <- resp
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
for _ , user = range users {
go sendUser(user, ch)
for {
select {
case r := <-ch:
if r.err != nil {
fmt.Println(r.err)
}
responses = append(responses, r)
**//Is there a better way to show that the processing of response is complete**?
if len(responses) == len(users) {
return responses, nil
}
case <-time.After(50 * time.Millisecond):
fmt.Printf(".")
}
}
}
return responses, nil
}
问题:
即使我使用了 goroutine,请求完成时间与没有 goroutine 时一样吗?我对 goroutine 做错了什么吗?
为了告诉工作不要再在这里等待我正在使用:
if len(responses) == len(users)
有没有更好的方法来显示response的处理完成,告诉ch不要再等了?
什么是wait.Syncgroup?我如何在我的 goroutine 中使用它?
我可能会做这样的事情..
func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get("URL/" + user)
if err != nil {
log.Println("err handle it")
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("err handle it")
}
ch <- string(b)
}
func AsyncHTTP(users []string) ([]string, error) {
ch := make(chan string)
var responses []string
var user string
var wg sync.WaitGroup
for _, user = range users {
wg.Add(1)
go sendUser(user, ch, &wg)
}
// close the channel in the background
go func() {
wg.Wait()
close(ch)
}()
// read from channel as they come in until its closed
for res := range ch {
responses = append(responses, res)
}
return responses, nil
}
它允许在发送时从频道中读取。通过使用等待组,我将知道何时关闭通道。通过将 waitgroup 和 close 放在一个 goroutine 中,我可以从 "realtime" 中的通道读取而不会阻塞。
对于有界并行/速率限制,我们可以看一个例子 https://blog.golang.org/pipelines#TOC_9.
基本上步骤是:
- 将用于调用 API 的输入/参数/参数流式传输到输入通道。
- 运行
N
个 worker goroutine,每个都使用相同的(共享的)输入通道。从输入通道获取参数,调用 API,将结果发送到结果通道。 - 使用结果通道,return有错误尽早
sync.WaitGroup
用于等待所有worker goroutine完成(输入通道耗尽后)。
下面是它的代码示例(您可以 运行 马上,尝试将 NUM_PARALLEL
更改为不同的并行数)。将 BASE_URL
更改为您的基础 url.
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
)
// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"
// number of parallelism
const NUM_PARALLEL = 20
// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
inputCh := make(chan string)
go func() {
defer close(inputCh)
for _, input := range inputs {
select {
case inputCh <- input:
case <-done:
// in case done is closed prematurely (because error midway),
// finish the loop (closing input channel)
break
}
}
}()
return inputCh
}
// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
url := BASE_URL + user
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
bodyStr := string(body)
return bodyStr, nil
}
// Wrapper for sendUser return value, used as result channel type
type result struct {
bodyStr string
err error
}
func AsyncHTTP(users []string) ([]string, error) {
done := make(chan struct{})
defer close(done)
inputCh := streamInputs(done, users)
var wg sync.WaitGroup
// bulk add goroutine counter at the start
wg.Add(NUM_PARALLEL)
resultCh := make(chan result)
for i := 0; i < NUM_PARALLEL; i++ {
// spawn N worker goroutines, each is consuming a shared input channel.
go func() {
for input := range inputCh {
bodyStr, err := sendUser(input)
resultCh <- result{bodyStr, err}
}
wg.Done()
}()
}
// Wait all worker goroutines to finish. Happens if there's no error (no early return)
go func() {
wg.Wait()
close(resultCh)
}()
results := []string{}
for result := range resultCh {
if result.err != nil {
// return early. done channel is closed, thus input channel is also closed.
// all worker goroutines stop working (because input channel is closed)
return nil, result.err
}
results = append(results, result.bodyStr)
}
return results, nil
}
func main() {
// populate users param
users := []string{}
for i := 1; i <= 100; i++ {
users = append(users, strconv.Itoa(i))
}
start := time.Now()
results, err := AsyncHTTP(users)
if err != nil {
fmt.Println(err)
return
}
for _, result := range results {
fmt.Println(result)
}
fmt.Println("finished in ", time.Since(start))
}