如何高效地并行化数组列表并控制并行度?
How to efficiently parallelize array list and control the parallelism?
我有一个 resourceId
数组,我需要它并行循环。并为每个资源生成URL
,然后放入一个映射中,该映射是键(resourcId),值为url。
我得到了下面的代码来完成这项工作,但我不确定这是否是正确的方法。我在这里使用 sizedwaitgroup 来并行化 resourceId
列表。并且在将数据写入地图时还使用地图上的锁。我确信这不是高效的代码,因为使用 lock 然后使用 sizedwaitgroup 会有一些性能问题。
最好和最有效的方法是什么?我应该在这里使用频道吗?我想控制我应该拥有多少的并行性,而不是 resourceId
列表的 运行 长度。如果任何 resourceId
url 生成失败,我想将其记录为 resourceId
的错误,但不要并行中断其他 go 例程 运行 以获得 url 为其他 resourceId
.
生成
例如:如果有 10 个资源,其中 2 个失败,则记录这 2 个的错误,地图应该有剩余 8 个的条目。
// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
swg.Add()
go func(resources string) {
defer swg.Done()
customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
if err != nil {
errs.NewWithCausef(err, "Could not generate the url for %s", resources)
}
mutex.Lock()
m[resources] = customerUrl
mutex.Unlock()
}(resources)
}
swg.Wait()
elapsed := time.Since(start)
fmt.Println(elapsed)
注意:以上代码将以高吞吐量从多个 reader 线程中调用,因此需要表现良好。
我不确定 sizedwaitgroup
是什么,也没有解释,但总的来说,这种方法看起来不是很典型的 Go。就此而言,“最佳”是一个见仁见智的问题,但 Go 中最典型的方法是遵循以下思路:
func main() {
wg := new(sync.WaitGroup)
start := time.Now()
numWorkers := 20
m := make(map[string]*customerPbV1.CustomerResponse)
work := make(chan string)
results := make(chan result)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(work, results)
}
go func() {
for _, resources := range resourcesList {
work <- resources
}
close(work)
}()
go func() {
wg.Wait()
close(results)
}()
for result := range results {
m[result.resources] = result.response
}
elapsed := time.Since(start)
fmt.Println(elapsed)
}
type result struct {
resources string
response *customerPbV1.CustomerResponse
}
func worker(ch chan string, r chan result) {
for w := range ch {
customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
if err != nil {
errs.NewWithCausef(err, "Could not generate the url for %s", resources)
continue
}
r <- result{w, customerUrl}
}
}
(虽然,根据名称,我会假设 errs.NewWithCause
实际上不处理错误,但 returns 一个,在这种情况下,当前代码会将它们丢在地上,并且一个适当的解决方案将有一个额外的 chan error
来处理错误:
func main() {
wg := new(sync.WaitGroup)
start := time.Now()
numWorkers := 20
m := make(map[string]*customerPbV1.CustomerResponse)
work := make(chan string)
results := make(chan result)
errors := make(chan error)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(work, results, errors)
}
go func() {
for _, resources := range resourcesList {
work <- resources
}
close(work)
}()
go func() {
wg.Wait()
close(results)
close(errors)
}()
go func() {
for err := range errors {
// Do something with err
}
}()
for result := range results {
m[result.resources] = result.response
}
elapsed := time.Since(start)
fmt.Println(elapsed)
}
type result struct {
resources string
response *customerPbV1.CustomerResponse
}
func worker(ch chan string, r chan result, errs chan error) {
for w := range ch {
customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
if err != nil {
errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
continue
}
r <- result{w, customerUrl}
}
}
我已经创建了带有注释的示例代码。
请阅读评论。
note: query function will sleep in 1 second.
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"runtime"
"strconv"
"sync"
"time"
)
type Result struct {
resource string
val int
err error
}
/*
CHANGE Result struct to this
result struct will collect all you need to create map
type Result struct {
resources string
customerUrl *customerPbV1.CustomerResponse
err error
}
*/
// const numWorker = 8
func main() {
now := time.Now()
rand.Seed(time.Now().UnixNano())
m := make(map[string]int)
// m := make(map[string]*customerPbV1.CustomerResponse) // CHANGE TO THIS
numWorker := runtime.NumCPU()
fmt.Println(numWorker)
chanResult := make(chan Result)
go func() {
for i := 0; i < 20; i++ {
/*
customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
we asume i is resources
chanResult <- Result {resource: strconv.Itoa(i)}
*/
chanResult <- Result{ // this will block until chanResult is consume in line 68
resource: strconv.Itoa(i),
}
}
close(chanResult)
}()
var wg sync.WaitGroup
cr := make(chan Result)
wg.Add(numWorker)
go func() {
wg.Wait()
close(cr) // NOTE: don't forget to close cr
}()
go func() {
for i := 0; i < numWorker; i++ { // this for loop will run goroutine
go func(x int) {
for job := range chanResult { // unblock chan on line 49
log.Println("worker", x, "working on", job.resource)
x, err := query(job.resource) // TODO: customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
cr <- Result{ // send to channel, will block until it consume. Consume is in MAIN goroutine "line 84"
resource: job.resource,
val: x,
err: err,
}
}
wg.Done()
}(i)
}
}()
counterTotal := 0
counterSuccess := 0
for res := range cr { // will unblock channel in line 71
if res.err != nil {
log.Printf("error found %s. stack trace: %s", res.resource, res.err)
} else {
m[res.resource] = res.val // NOTE: save to map
counterSuccess++
}
counterTotal++
}
log.Printf("%d/%d of total job run", counterSuccess, counterTotal)
fmt.Println("final :", m)
fmt.Println("len m", len(m))
fmt.Println(runtime.NumGoroutine())
fmt.Println(time.Since(now))
}
func query(s string) (int, error) {
time.Sleep(time.Second)
i, err := strconv.Atoi(s)
if err != nil {
return 0, err
}
if i%3 == 0 {
return 0, errors.New("i divided by 3")
}
ms := i + 500 + rand.Intn(500)
return ms, nil
}
这里是纯渠道解决方案(playground)。
我认为性能确实取决于 GenerateUrl
或我的代码 generateURL
。
我还想指出的另一件事是,正确的术语是 concurrency not parallelism.
package main
import (
"errors"
"log"
"strconv"
"strings"
)
type result struct {
resourceID, url string
err error
}
func generateURL(resourceID string) (string, error) {
if strings.HasPrefix(resourceID, "error-") {
return "", errors.New(resourceID)
}
return resourceID, nil
}
func main() {
// This is the resource IDs
resources := make([]string, 10000)
for i := 0; i < 10000; i++ {
s := strconv.Itoa(i)
if i%10 == 0 {
resources[i] = "error-" + s
} else {
resources[i] = "id-" + s
}
}
numOfChannel := 20
// We send result through this channel to the resourceMap
ch := make(chan result, 10)
// These are the channels that go routine receives resource ID from
channels := make([]chan string, numOfChannel)
// After processing all resources, this channel is used to signal the go routines to exit
done := make(chan struct{})
for i := range channels {
c := make(chan string)
channels[i] = c
go func() {
for {
select {
case rid := <-c:
u, err := generateURL(rid)
ch <- result{rid, u, err}
case _, ok := <-done:
if !ok {
break
}
}
}
}()
}
go func() {
for i, r := range resources {
channels[i%numOfChannel] <- r
}
}()
resourceMap := make(map[string]string)
i := 0
for p := range ch {
if p.err != nil {
log.Println(p.resourceID, p.err)
} else {
resourceMap[p.resourceID] = p.url
}
i++
if i == len(resources)-1 {
break
}
}
close(done)
}
我有一个 resourceId
数组,我需要它并行循环。并为每个资源生成URL
,然后放入一个映射中,该映射是键(resourcId),值为url。
我得到了下面的代码来完成这项工作,但我不确定这是否是正确的方法。我在这里使用 sizedwaitgroup 来并行化 resourceId
列表。并且在将数据写入地图时还使用地图上的锁。我确信这不是高效的代码,因为使用 lock 然后使用 sizedwaitgroup 会有一些性能问题。
最好和最有效的方法是什么?我应该在这里使用频道吗?我想控制我应该拥有多少的并行性,而不是 resourceId
列表的 运行 长度。如果任何 resourceId
url 生成失败,我想将其记录为 resourceId
的错误,但不要并行中断其他 go 例程 运行 以获得 url 为其他 resourceId
.
例如:如果有 10 个资源,其中 2 个失败,则记录这 2 个的错误,地图应该有剩余 8 个的条目。
// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
swg.Add()
go func(resources string) {
defer swg.Done()
customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
if err != nil {
errs.NewWithCausef(err, "Could not generate the url for %s", resources)
}
mutex.Lock()
m[resources] = customerUrl
mutex.Unlock()
}(resources)
}
swg.Wait()
elapsed := time.Since(start)
fmt.Println(elapsed)
注意:以上代码将以高吞吐量从多个 reader 线程中调用,因此需要表现良好。
我不确定 sizedwaitgroup
是什么,也没有解释,但总的来说,这种方法看起来不是很典型的 Go。就此而言,“最佳”是一个见仁见智的问题,但 Go 中最典型的方法是遵循以下思路:
func main() {
wg := new(sync.WaitGroup)
start := time.Now()
numWorkers := 20
m := make(map[string]*customerPbV1.CustomerResponse)
work := make(chan string)
results := make(chan result)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(work, results)
}
go func() {
for _, resources := range resourcesList {
work <- resources
}
close(work)
}()
go func() {
wg.Wait()
close(results)
}()
for result := range results {
m[result.resources] = result.response
}
elapsed := time.Since(start)
fmt.Println(elapsed)
}
type result struct {
resources string
response *customerPbV1.CustomerResponse
}
func worker(ch chan string, r chan result) {
for w := range ch {
customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
if err != nil {
errs.NewWithCausef(err, "Could not generate the url for %s", resources)
continue
}
r <- result{w, customerUrl}
}
}
(虽然,根据名称,我会假设 errs.NewWithCause
实际上不处理错误,但 returns 一个,在这种情况下,当前代码会将它们丢在地上,并且一个适当的解决方案将有一个额外的 chan error
来处理错误:
func main() {
wg := new(sync.WaitGroup)
start := time.Now()
numWorkers := 20
m := make(map[string]*customerPbV1.CustomerResponse)
work := make(chan string)
results := make(chan result)
errors := make(chan error)
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(work, results, errors)
}
go func() {
for _, resources := range resourcesList {
work <- resources
}
close(work)
}()
go func() {
wg.Wait()
close(results)
close(errors)
}()
go func() {
for err := range errors {
// Do something with err
}
}()
for result := range results {
m[result.resources] = result.response
}
elapsed := time.Since(start)
fmt.Println(elapsed)
}
type result struct {
resources string
response *customerPbV1.CustomerResponse
}
func worker(ch chan string, r chan result, errs chan error) {
for w := range ch {
customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
if err != nil {
errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
continue
}
r <- result{w, customerUrl}
}
}
我已经创建了带有注释的示例代码。 请阅读评论。
note: query function will sleep in 1 second.
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"runtime"
"strconv"
"sync"
"time"
)
type Result struct {
resource string
val int
err error
}
/*
CHANGE Result struct to this
result struct will collect all you need to create map
type Result struct {
resources string
customerUrl *customerPbV1.CustomerResponse
err error
}
*/
// const numWorker = 8
func main() {
now := time.Now()
rand.Seed(time.Now().UnixNano())
m := make(map[string]int)
// m := make(map[string]*customerPbV1.CustomerResponse) // CHANGE TO THIS
numWorker := runtime.NumCPU()
fmt.Println(numWorker)
chanResult := make(chan Result)
go func() {
for i := 0; i < 20; i++ {
/*
customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
we asume i is resources
chanResult <- Result {resource: strconv.Itoa(i)}
*/
chanResult <- Result{ // this will block until chanResult is consume in line 68
resource: strconv.Itoa(i),
}
}
close(chanResult)
}()
var wg sync.WaitGroup
cr := make(chan Result)
wg.Add(numWorker)
go func() {
wg.Wait()
close(cr) // NOTE: don't forget to close cr
}()
go func() {
for i := 0; i < numWorker; i++ { // this for loop will run goroutine
go func(x int) {
for job := range chanResult { // unblock chan on line 49
log.Println("worker", x, "working on", job.resource)
x, err := query(job.resource) // TODO: customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
cr <- Result{ // send to channel, will block until it consume. Consume is in MAIN goroutine "line 84"
resource: job.resource,
val: x,
err: err,
}
}
wg.Done()
}(i)
}
}()
counterTotal := 0
counterSuccess := 0
for res := range cr { // will unblock channel in line 71
if res.err != nil {
log.Printf("error found %s. stack trace: %s", res.resource, res.err)
} else {
m[res.resource] = res.val // NOTE: save to map
counterSuccess++
}
counterTotal++
}
log.Printf("%d/%d of total job run", counterSuccess, counterTotal)
fmt.Println("final :", m)
fmt.Println("len m", len(m))
fmt.Println(runtime.NumGoroutine())
fmt.Println(time.Since(now))
}
func query(s string) (int, error) {
time.Sleep(time.Second)
i, err := strconv.Atoi(s)
if err != nil {
return 0, err
}
if i%3 == 0 {
return 0, errors.New("i divided by 3")
}
ms := i + 500 + rand.Intn(500)
return ms, nil
}
这里是纯渠道解决方案(playground)。
我认为性能确实取决于 GenerateUrl
或我的代码 generateURL
。
我还想指出的另一件事是,正确的术语是 concurrency not parallelism.
package main
import (
"errors"
"log"
"strconv"
"strings"
)
type result struct {
resourceID, url string
err error
}
func generateURL(resourceID string) (string, error) {
if strings.HasPrefix(resourceID, "error-") {
return "", errors.New(resourceID)
}
return resourceID, nil
}
func main() {
// This is the resource IDs
resources := make([]string, 10000)
for i := 0; i < 10000; i++ {
s := strconv.Itoa(i)
if i%10 == 0 {
resources[i] = "error-" + s
} else {
resources[i] = "id-" + s
}
}
numOfChannel := 20
// We send result through this channel to the resourceMap
ch := make(chan result, 10)
// These are the channels that go routine receives resource ID from
channels := make([]chan string, numOfChannel)
// After processing all resources, this channel is used to signal the go routines to exit
done := make(chan struct{})
for i := range channels {
c := make(chan string)
channels[i] = c
go func() {
for {
select {
case rid := <-c:
u, err := generateURL(rid)
ch <- result{rid, u, err}
case _, ok := <-done:
if !ok {
break
}
}
}
}()
}
go func() {
for i, r := range resources {
channels[i%numOfChannel] <- r
}
}()
resourceMap := make(map[string]string)
i := 0
for p := range ch {
if p.err != nil {
log.Println(p.resourceID, p.err)
} else {
resourceMap[p.resourceID] = p.url
}
i++
if i == len(resources)-1 {
break
}
}
close(done)
}