去死锁所有 goroutines 睡着了

Go deadlock all goroutines asleep

这是我之前 post 的跟进:

    

在阅读了 SO 内外的多个主题和文章后,我仍然无法确定应该在何处关闭频道。

该程序将打开一个文件列表,为每个输入文件创建一个输出文件(具有相同的名称),访问每个输入文件中的所有 url 并从中获取所有 href 链接 - 这些链接保存到对应的输出文件。 但是,我收到以下错误:

    http://play.golang.org/p/8X-1rM3aXC

linkgetter、getHref函数主要是处理。 Head 和 tail 运行 作为单独的 goroutines,由 worker 进行处理。

    package main

    import (
    "bufio"
    "bytes"
    "fmt"
    "golang.org/x/net/html"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "path/filepath"
    "regexp"
    "sync"
    )

    type Work struct {
    Link     string
    Filename string
    }

    type Output struct {
    Href     string
    Filename string
    }

    func getHref(t html.Token) (href string, ok bool) {
    // Iterate over all of the Token's attributes until we find an    "href"
    for _, a := range t.Attr {
            if a.Key == "href" {
                    href = a.Val
                    ok = true
            }
    }
    return
    }

    func linkGetter(out chan<- Output, r io.Reader, filename string) {
    z := html.NewTokenizer(r)
    for {
            tt := z.Next()
            switch {
            case tt == html.ErrorToken:
                    return
            case tt == html.StartTagToken:
                    t := z.Token()
                    isAnchor := t.Data == "a"
                    if !isAnchor {
                            continue
                    }

                    // Extract the href value, if there is one
                    url, ok := getHref(t)
                    if !ok {
                            continue
                    }

                    out <- Output{url, filename}
            }
    }
    }

    func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup)    {
    defer wg.Done()
    for work := range in {
            resp, err := http.Get(work.Link)
            if err != nil {
                    continue
            }
            body, err := ioutil.ReadAll(resp.Body)
            if err != nil {
                    continue
            }
            if err = resp.Body.Close(); err != nil {
                    fmt.Println(err)
            }
            linkGetter(out, bytes.NewReader(body), work.Filename)
    }
    }

    func head(c chan<- Work) {
    r, _ := regexp.Compile("(.*)(?:.json)")
    files, _ := filepath.Glob("*.json")

    for _, elem := range files {
            res := r.FindStringSubmatch(elem)
            for k, v := range res {

                    if k == 0 {
                            outpath, _ :=  filepath.Abs(fmt.Sprintf("go_tester/%s", v))

                            abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
                            f, _ := os.Open(abspath)
                            scanner := bufio.NewScanner(f)

                            for scanner.Scan() {
                                    c <- Work{outpath, scanner.Text()}
                            }

                    }
            }

    }


    }

    func tail(c <-chan Output) {
    currentfile := ""
    var f *os.File
    var err error
    for out := range c {
            if out.Filename != currentfile {
                    if err = f.Close(); err != nil { // might cause an error on first run
                            fmt.Println(err)
                    }
                    f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
                    if err != nil {
                            log.Fatal(err)
                    }
                    currentfile = out.Filename
            }
            if _, err = f.WriteString(out.Href + "\n"); err != nil {
                    fmt.Println(err)
            }
    }

    }

    const (
    nworkers = 80
    )

    func main() {
    //fmt.Println("hi")
    in := make(chan Work)
    out := make(chan Output)

    go head(in)
    go tail(out)

    var wg sync.WaitGroup
    for i := 0; i < 85; i++ {
            wg.Add(1)
            go worker(out, in, &wg)
    }
    close(in)   
    close(out)    
    wg.Wait()


    }

通道关闭的方式有什么问题?

你并没有真正注意这里的管道设计。对于管道的每个部分,您都必须问自己 "When is section X done? What should happen when it is done? What happens after it is done?"。

您启动 headtailworker 以跨频道测距。这些功能要 return 成功的唯一方法是关闭这些通道。

根据需要画出来。

  1. head(in) 馈入 in
  2. worker(out, in, &wg) 范围超过 in,馈入 out,并告诉您一旦 in 关闭 wg 就完成了
  3. tail(out) 范围超过 out

那么你需要做什么来:

  1. 确保所有输入都已处理?
  2. 确保所有 goroutines return?

像这样:

  1. 一旦处理完所有文件,您需要从 head 关闭 in
  2. 这将导致 worker 实际上 return 一旦它可以从 in 获得的所有项目都得到处理,导致 wg.Wait() 到 return
  3. 现在关闭 out 是安全的,因为没有任何东西进入它,这将导致 tail 最终 return。

但是您可能需要另一个与 tail 关联的 sync.WaitGroup 用于此特定设计,因为整个程序将在 wg.Wait() return 时退出,因此可能没有完成 tail 正在做的所有工作。 See here。具体来说:

Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

您可能还想使用缓冲通道 referenced here 来帮助避免在 goroutine 之间切换执行太多。使用您当前的设计,您在上下文切换上浪费了很多时间。