如何确保在 goroutines 内启动的 goroutines 相互同步?
How to ensure goroutines launched within goroutines are synchronized with each other?
这是我第一次使用 Go 的并发特性,我正在深入研究。
我想同时调用 API。该请求基于我想收到的帖子的标签(可以有 1 <= N 标签)。响应正文如下所示:
{
"posts": [
{
"id": 1,
"author": "Name",
"authorId": 1,
"likes": num_likes,
"popularity": popularity_decimal,
"reads": num_reads,
"tags": [ "tag1", "tag2" ]
},
...
]
}
我的计划是将一堆通道菊花链在一起,并生成一些从这些通道读取和/或写入的 goroutines:
- for each tag, add it to a tagsChannel inside a goroutine
- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint
- for each response of that request, pass the underlying slice of posts into another goroutine
- for each individual post inside the slice of posts, add the post to a postChannel
- inside another goroutine, iterate over postChannel and insert each post into a data structure
这是我目前的情况:
func (srv *server) Get() {
// Using red-black tree prevents any duplicates, fast insertion
// and retrieval times, and is sorted already on ID.
rbt := tree.NewWithIntComparator()
// concurrent approach
tagChan := make(chan string) // tags -> tagChan
postChan := make(chan models.Post) // tagChan -> GET -> post -> postChan
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(4)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
go func() {
wg.Wait()
close(errChan)
}()
go insertTags(tags, tagChan, wg)
go fetch(postChan, tagChan, errChan, wg)
go addPostToTree(rbt, postChan, wg)
for err := range errChan {
if err != nil {
srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
}
}
}
// insertTags inserts user's passed-in tags to tagChan
// so that tagChan may pass those along in fetch.
func insertTags(tags []string, tagChan chan<- string, group *sync.WaitGroup) {
defer group.Done()
for _, tag := range tags {
tagChan <- tag
}
close(tagChan)
}
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
defer group.Done()
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
group.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()
}()
// group.Done() -- removed this call due to Burak, but now my program hands
}
}
// insertPosts inserts each individual post into our posts channel so that they may be
// concurrently added to our RBT.
func insertPosts(posts chan<- models.Post, container []models.Post, group *sync.WaitGroup) {
defer group.Done()
for _, post := range container {
posts <- post
}
}
// addPostToTree iterates over the channel and
// inserts each individual post into our RBT,
// setting the post ID as the node's key.
func addPostToTree(tree *tree.RBT, collection <-chan models.Post, group *sync.WaitGroup) {
defer group.Done()
for post := range collection {
// ignore return value & error here:
// we don't care about the returned key and
// error is only ever if a duplicate is attempted to be added -- we don't care
tree.Insert(post.ID, post)
}
}
我能够向端点发出一个请求,但是一旦尝试提交第二个请求,我的程序失败 panic: sync: negative WaitGroup counter
.
我的问题是为什么我的 WaitGroup 计数器会变为负值?我确保添加到等待组并标记我的 goroutines 何时完成。
如果等待组在 第二个请求 上为负数,那么这一定意味着我第一次分配等待组并向其添加 4 时被跳过了……为什么?这可能与关闭频道有关吗?如果是这样,我应该在哪里关闭频道?
还有——有人有调试 goroutines 的技巧吗?
感谢您的帮助。
首先,整个设计比较复杂。最后说说我的想法。
您的代码中有 2 个问题:
posts
通道永远不会关闭,因此 addPostToTree
可能永远不会存在循环,导致一个 waitGroup 永远不会减少(在您的情况下程序挂起)。程序有可能无限期等待死锁(以为其他 goroutine 会释放它,但所有 goroutines 都卡住了)。
解决方法:您可以关闭postChan
频道。但是怎么办?始终建议制作人始终关闭频道,但您有多个制作人。所以最好的选择是,等待所有生产者完成,然后关闭通道。为了等待所有生产者完成,您需要创建另一个 waitGroup 并使用它来跟踪 child 例程。
代码:
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
postsWG := &sync.WaitGroup{}
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
postsWG.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
go insertPosts(posts, container.Posts, postsWG)
}()
}
defer func() {
postsWG.Wait()
close(posts)
group.Done()
}()
}
- 现在,我们有另一个问题,主 waitGroup 应该用
3
而不是 4
初始化。这是因为主例程只启动了另外 3 个例程 wg.Add(3)
,所以它必须只跟踪那些。对于 child 例程,我们使用了不同的 waitGroup,因此这不再是 parent 令人头疼的问题了。
代码:
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(3)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
TLDR--
复杂的设计——因为主等待组在一个地方启动,但是每个 goroutine 都根据需要修改这个 waitGroup。所以这个没有单一的所有者,这使得调试和维护非常复杂(+ 不能确保它没有错误)。
我建议将其分解并为每个 child 例程设置单独的跟踪器。这样,启动更多例程的调用者就可以只专注于跟踪其 child goroutines。此例程将仅在其完成后通知其 parent waitGroup(&其 child 已完成,而不是让 child 例程直接通知 grandparent)。
此外,在fetch
方法中进行HTTP 调用并获得响应后,为什么要创建另一个goroutine 来处理这些数据?无论哪种方式,这个 goroutine 在数据插入发生之前都不会退出,它也不会执行其他发生数据处理的操作。据我了解,第二个 goroutine 是多余的。
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()
这是我第一次使用 Go 的并发特性,我正在深入研究。
我想同时调用 API。该请求基于我想收到的帖子的标签(可以有 1 <= N 标签)。响应正文如下所示:
{
"posts": [
{
"id": 1,
"author": "Name",
"authorId": 1,
"likes": num_likes,
"popularity": popularity_decimal,
"reads": num_reads,
"tags": [ "tag1", "tag2" ]
},
...
]
}
我的计划是将一堆通道菊花链在一起,并生成一些从这些通道读取和/或写入的 goroutines:
- for each tag, add it to a tagsChannel inside a goroutine
- use that tagsChannel inside another goroutine to make concurrent GET requests to the endpoint
- for each response of that request, pass the underlying slice of posts into another goroutine
- for each individual post inside the slice of posts, add the post to a postChannel
- inside another goroutine, iterate over postChannel and insert each post into a data structure
这是我目前的情况:
func (srv *server) Get() {
// Using red-black tree prevents any duplicates, fast insertion
// and retrieval times, and is sorted already on ID.
rbt := tree.NewWithIntComparator()
// concurrent approach
tagChan := make(chan string) // tags -> tagChan
postChan := make(chan models.Post) // tagChan -> GET -> post -> postChan
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(4)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
go func() {
wg.Wait()
close(errChan)
}()
go insertTags(tags, tagChan, wg)
go fetch(postChan, tagChan, errChan, wg)
go addPostToTree(rbt, postChan, wg)
for err := range errChan {
if err != nil {
srv.HandleError(err, http.StatusInternalServerError).ServeHTTP(w, r)
}
}
}
// insertTags inserts user's passed-in tags to tagChan
// so that tagChan may pass those along in fetch.
func insertTags(tags []string, tagChan chan<- string, group *sync.WaitGroup) {
defer group.Done()
for _, tag := range tags {
tagChan <- tag
}
close(tagChan)
}
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
defer group.Done()
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
group.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()
}()
// group.Done() -- removed this call due to Burak, but now my program hands
}
}
// insertPosts inserts each individual post into our posts channel so that they may be
// concurrently added to our RBT.
func insertPosts(posts chan<- models.Post, container []models.Post, group *sync.WaitGroup) {
defer group.Done()
for _, post := range container {
posts <- post
}
}
// addPostToTree iterates over the channel and
// inserts each individual post into our RBT,
// setting the post ID as the node's key.
func addPostToTree(tree *tree.RBT, collection <-chan models.Post, group *sync.WaitGroup) {
defer group.Done()
for post := range collection {
// ignore return value & error here:
// we don't care about the returned key and
// error is only ever if a duplicate is attempted to be added -- we don't care
tree.Insert(post.ID, post)
}
}
我能够向端点发出一个请求,但是一旦尝试提交第二个请求,我的程序失败 panic: sync: negative WaitGroup counter
.
我的问题是为什么我的 WaitGroup 计数器会变为负值?我确保添加到等待组并标记我的 goroutines 何时完成。
如果等待组在 第二个请求 上为负数,那么这一定意味着我第一次分配等待组并向其添加 4 时被跳过了……为什么?这可能与关闭频道有关吗?如果是这样,我应该在哪里关闭频道?
还有——有人有调试 goroutines 的技巧吗?
感谢您的帮助。
首先,整个设计比较复杂。最后说说我的想法。
您的代码中有 2 个问题:
posts
通道永远不会关闭,因此addPostToTree
可能永远不会存在循环,导致一个 waitGroup 永远不会减少(在您的情况下程序挂起)。程序有可能无限期等待死锁(以为其他 goroutine 会释放它,但所有 goroutines 都卡住了)。
解决方法:您可以关闭postChan
频道。但是怎么办?始终建议制作人始终关闭频道,但您有多个制作人。所以最好的选择是,等待所有生产者完成,然后关闭通道。为了等待所有生产者完成,您需要创建另一个 waitGroup 并使用它来跟踪 child 例程。
代码:
// fetch completes a GET request to the endpoint
func fetch(posts chan<- models.Post, tags <-chan string, errs chan<- error, group *sync.WaitGroup) {
postsWG := &sync.WaitGroup{}
for tag := range tags {
ep, err := formURL(tag)
if err != nil {
errs <- err
}
postsWG.Add(1) // QUESTION should I use a separate wait group here?
go func() {
resp, err := http.Get(ep.String())
if err != nil {
errs <- err
}
container := models.PostContainer{}
err = json.NewDecoder(resp.Body).Decode(&container)
defer resp.Body.Close()
go insertPosts(posts, container.Posts, postsWG)
}()
}
defer func() {
postsWG.Wait()
close(posts)
group.Done()
}()
}
- 现在,我们有另一个问题,主 waitGroup 应该用
3
而不是4
初始化。这是因为主例程只启动了另外 3 个例程wg.Add(3)
,所以它必须只跟踪那些。对于 child 例程,我们使用了不同的 waitGroup,因此这不再是 parent 令人头疼的问题了。
代码:
errChan := make(chan error) // for synchronizing errors across goroutines
wg := &sync.WaitGroup{} // for synchronizing goroutines
wg.Add(3)
// create a go func to synchronize our wait groups
// once all goroutines are finished, we can close our errChan
TLDR--
复杂的设计——因为主等待组在一个地方启动,但是每个 goroutine 都根据需要修改这个 waitGroup。所以这个没有单一的所有者,这使得调试和维护非常复杂(+ 不能确保它没有错误)。
我建议将其分解并为每个 child 例程设置单独的跟踪器。这样,启动更多例程的调用者就可以只专注于跟踪其 child goroutines。此例程将仅在其完成后通知其 parent waitGroup(&其 child 已完成,而不是让 child 例程直接通知 grandparent)。
此外,在fetch
方法中进行HTTP 调用并获得响应后,为什么要创建另一个goroutine 来处理这些数据?无论哪种方式,这个 goroutine 在数据插入发生之前都不会退出,它也不会执行其他发生数据处理的操作。据我了解,第二个 goroutine 是多余的。
group.Add(1) // QUESTION should I add a separate wait group here and pass it to insertPosts?
go insertPosts(posts, container.Posts, group)
defer group.Done()