去死锁所有 goroutines 睡着了
Go deadlock all goroutines asleep
这是我之前 post 的跟进:
在阅读了 SO 内外的多个主题和文章后,我仍然无法确定应该在何处关闭频道。
该程序将打开一个文件列表,为每个输入文件创建一个输出文件(具有相同的名称),访问每个输入文件中的所有 url 并从中获取所有 href 链接 - 这些链接保存到对应的输出文件。
但是,我收到以下错误:
http://play.golang.org/p/8X-1rM3aXC
linkgetter、getHref函数主要是处理。 Head 和 tail 运行 作为单独的 goroutines,由 worker 进行处理。
package main
import (
"bufio"
"bytes"
"fmt"
"golang.org/x/net/html"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
)
type Work struct {
Link string
Filename string
}
type Output struct {
Href string
Filename string
}
func getHref(t html.Token) (href string, ok bool) {
// Iterate over all of the Token's attributes until we find an "href"
for _, a := range t.Attr {
if a.Key == "href" {
href = a.Val
ok = true
}
}
return
}
func linkGetter(out chan<- Output, r io.Reader, filename string) {
z := html.NewTokenizer(r)
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
return
case tt == html.StartTagToken:
t := z.Token()
isAnchor := t.Data == "a"
if !isAnchor {
continue
}
// Extract the href value, if there is one
url, ok := getHref(t)
if !ok {
continue
}
out <- Output{url, filename}
}
}
}
func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup) {
defer wg.Done()
for work := range in {
resp, err := http.Get(work.Link)
if err != nil {
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
if err = resp.Body.Close(); err != nil {
fmt.Println(err)
}
linkGetter(out, bytes.NewReader(body), work.Filename)
}
}
func head(c chan<- Work) {
r, _ := regexp.Compile("(.*)(?:.json)")
files, _ := filepath.Glob("*.json")
for _, elem := range files {
res := r.FindStringSubmatch(elem)
for k, v := range res {
if k == 0 {
outpath, _ := filepath.Abs(fmt.Sprintf("go_tester/%s", v))
abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
f, _ := os.Open(abspath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
c <- Work{outpath, scanner.Text()}
}
}
}
}
}
func tail(c <-chan Output) {
currentfile := ""
var f *os.File
var err error
for out := range c {
if out.Filename != currentfile {
if err = f.Close(); err != nil { // might cause an error on first run
fmt.Println(err)
}
f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
currentfile = out.Filename
}
if _, err = f.WriteString(out.Href + "\n"); err != nil {
fmt.Println(err)
}
}
}
const (
nworkers = 80
)
func main() {
//fmt.Println("hi")
in := make(chan Work)
out := make(chan Output)
go head(in)
go tail(out)
var wg sync.WaitGroup
for i := 0; i < 85; i++ {
wg.Add(1)
go worker(out, in, &wg)
}
close(in)
close(out)
wg.Wait()
}
通道关闭的方式有什么问题?
你并没有真正注意这里的管道设计。对于管道的每个部分,您都必须问自己 "When is section X done? What should happen when it is done? What happens after it is done?"。
您启动 head
、tail
和 worker
以跨频道测距。这些功能要 return 成功的唯一方法是关闭这些通道。
根据需要画出来。
head(in)
馈入 in
worker(out, in, &wg)
范围超过 in
,馈入 out
,并告诉您一旦 in
关闭 wg
就完成了
tail(out)
范围超过 out
那么你需要做什么来:
- 确保所有输入都已处理?
- 确保所有 goroutines return?
像这样:
- 一旦处理完所有文件,您需要从
head
关闭 in
。
- 这将导致
worker
实际上 return 一旦它可以从 in
获得的所有项目都得到处理,导致 wg.Wait()
到 return
- 现在关闭
out
是安全的,因为没有任何东西进入它,这将导致 tail
最终 return。
但是您可能需要另一个与 tail
关联的 sync.WaitGroup
用于此特定设计,因为整个程序将在 wg.Wait()
return 时退出,因此可能没有完成 tail
正在做的所有工作。 See here。具体来说:
Program execution begins by initializing the main package and then
invoking the function main. When that function invocation returns, the
program exits. It does not wait for other (non-main) goroutines to
complete.
您可能还想使用缓冲通道 referenced here 来帮助避免在 goroutine 之间切换执行太多。使用您当前的设计,您在上下文切换上浪费了很多时间。
这是我之前 post 的跟进:
在阅读了 SO 内外的多个主题和文章后,我仍然无法确定应该在何处关闭频道。
该程序将打开一个文件列表,为每个输入文件创建一个输出文件(具有相同的名称),访问每个输入文件中的所有 url 并从中获取所有 href 链接 - 这些链接保存到对应的输出文件。 但是,我收到以下错误:
http://play.golang.org/p/8X-1rM3aXC
linkgetter、getHref函数主要是处理。 Head 和 tail 运行 作为单独的 goroutines,由 worker 进行处理。
package main
import (
"bufio"
"bytes"
"fmt"
"golang.org/x/net/html"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
)
type Work struct {
Link string
Filename string
}
type Output struct {
Href string
Filename string
}
func getHref(t html.Token) (href string, ok bool) {
// Iterate over all of the Token's attributes until we find an "href"
for _, a := range t.Attr {
if a.Key == "href" {
href = a.Val
ok = true
}
}
return
}
func linkGetter(out chan<- Output, r io.Reader, filename string) {
z := html.NewTokenizer(r)
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
return
case tt == html.StartTagToken:
t := z.Token()
isAnchor := t.Data == "a"
if !isAnchor {
continue
}
// Extract the href value, if there is one
url, ok := getHref(t)
if !ok {
continue
}
out <- Output{url, filename}
}
}
}
func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup) {
defer wg.Done()
for work := range in {
resp, err := http.Get(work.Link)
if err != nil {
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
if err = resp.Body.Close(); err != nil {
fmt.Println(err)
}
linkGetter(out, bytes.NewReader(body), work.Filename)
}
}
func head(c chan<- Work) {
r, _ := regexp.Compile("(.*)(?:.json)")
files, _ := filepath.Glob("*.json")
for _, elem := range files {
res := r.FindStringSubmatch(elem)
for k, v := range res {
if k == 0 {
outpath, _ := filepath.Abs(fmt.Sprintf("go_tester/%s", v))
abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
f, _ := os.Open(abspath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
c <- Work{outpath, scanner.Text()}
}
}
}
}
}
func tail(c <-chan Output) {
currentfile := ""
var f *os.File
var err error
for out := range c {
if out.Filename != currentfile {
if err = f.Close(); err != nil { // might cause an error on first run
fmt.Println(err)
}
f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
currentfile = out.Filename
}
if _, err = f.WriteString(out.Href + "\n"); err != nil {
fmt.Println(err)
}
}
}
const (
nworkers = 80
)
func main() {
//fmt.Println("hi")
in := make(chan Work)
out := make(chan Output)
go head(in)
go tail(out)
var wg sync.WaitGroup
for i := 0; i < 85; i++ {
wg.Add(1)
go worker(out, in, &wg)
}
close(in)
close(out)
wg.Wait()
}
通道关闭的方式有什么问题?
你并没有真正注意这里的管道设计。对于管道的每个部分,您都必须问自己 "When is section X done? What should happen when it is done? What happens after it is done?"。
您启动 head
、tail
和 worker
以跨频道测距。这些功能要 return 成功的唯一方法是关闭这些通道。
根据需要画出来。
head(in)
馈入in
worker(out, in, &wg)
范围超过in
,馈入out
,并告诉您一旦in
关闭wg
就完成了tail(out)
范围超过out
那么你需要做什么来:
- 确保所有输入都已处理?
- 确保所有 goroutines return?
像这样:
- 一旦处理完所有文件,您需要从
head
关闭in
。 - 这将导致
worker
实际上 return 一旦它可以从in
获得的所有项目都得到处理,导致wg.Wait()
到 return - 现在关闭
out
是安全的,因为没有任何东西进入它,这将导致tail
最终 return。
但是您可能需要另一个与 tail
关联的 sync.WaitGroup
用于此特定设计,因为整个程序将在 wg.Wait()
return 时退出,因此可能没有完成 tail
正在做的所有工作。 See here。具体来说:
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.
您可能还想使用缓冲通道 referenced here 来帮助避免在 goroutine 之间切换执行太多。使用您当前的设计,您在上下文切换上浪费了很多时间。