Go 中的网络爬虫
Webcrawler in Go
我正在尝试在 Go 中构建一个网络爬虫,我想在其中指定并发工作人员的最大数量。只要队列中有 link 待探索,它们都会工作。当队列中的元素少于工作人员时,工作人员应大声喊叫,但如果发现更多 link 则恢复。
我试过的密码是
const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
for element := range queue {
wg.Done() // why is defer here causing a deadlock?
fmt.Println("adding 2 new elements ")
if element%2 == 0 {
wg.Add(2)
queue <- (element*100 + 11)
queue <- (element*100 + 33)
}
}
}
func main() {
var wg sync.WaitGroup
queue := make(chan int, 10)
queue <- 0
queue <- 1
queue <- 2
queue <- 3
var min int
if (len(queue) < max_workers) {
min = len(queue)
} else {
min = max_workers
}
for i := 0; i < min; i++ {
wg.Add(1)
go crawl(&wg, queue)
}
wg.Wait()
close(queue)
}
这似乎可行,但有一个问题:开始时我必须用多个元素填充队列。我希望它从一个(单个)种子页面(在我的示例 queue <- 0
中)开始,然后动态地增长/缩小工作池。
我的问题是:
如何获得行为?
为什么 defer wg.Done()
导致死锁? wg.Done()
函数真正完成时正常吗?我认为没有 defer
goroutine 不会等待另一部分完成(在解析 HTML 的实际工作示例中可能需要更长的时间)。
如果您使用自己喜欢的网络搜索 "Go web crawler"(或 "golang web crawler")
你会发现很多例子,包括:
Go Tour Exercise: Web Crawler。
在 Go 中也有一些关于并发的讨论涵盖了这种事情。
Go 中的 "standard" 方法完全不需要等待组。
要回答您的一个问题,使用 defer
排队的事物在函数 return 时只会得到 运行。你有一个很长的 运行ning 函数,所以不要在这样的循环中使用 defer
。
"standard" 方法是在自己的 goroutine 中启动任意数量的 worker。
他们都从同一个频道读"jobs",阻塞if/when无计可施。
完全完成后,该频道将关闭,他们都会退出。
在爬虫之类的情况下,工作人员会发现更多 "jobs" 要做的事情,并希望将他们排入队列。
你不希望他们写回同一个通道,因为它会有一些有限的缓冲(或 none!),你最终会阻止所有试图排队更多工作的工人!
一个简单的解决方案是使用单独的通道
(例如每个工人有 in <-chan Job, out chan<- Job
)
以及读取这些请求的单个 queue/filter goroutine,
将它们附加到一个切片上,它可以让其增长任意大或对其进行一些全局限制,
并且还从切片的头部馈送另一个通道
(即一个简单的 for-select 循环,从一个通道读取并写入另一个通道)。
此代码通常还负责跟踪已完成的操作
(例如访问过的 URL 的地图)并丢弃传入的重复请求。
队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):
type Job string
func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
var list []Job
done := make(map[Job]bool)
for {
var send chan<- Job
var item Job
if len(list) > 0 {
send = toWorkers
item = list[0]
}
select {
case send <- item:
// We sent an item, remove it
list = list[1:]
case thing := <-fromWorkers:
// Got a new thing
if !done[thing] {
list = append(list, thing)
done[thing] = true
}
}
}
}
在这个简单的示例中掩盖了一些事情。
比如终止。如果 "Jobs" 是一些更大的结构,您希望在其中使用 chan *Job
和 []*Job
。
在这种情况下,您还需要将地图类型更改为您从作业中提取的某个键
(例如 Job.URL
也许)
并且您想在 list = list[1:]
之前执行 list[0] = nil
以摆脱对 *Job
指针的引用,并让垃圾收集器更早地处理它。
编辑:关于完全终止的一些注意事项。
有几种方法可以干净地终止上述代码。可以使用等待组,但需要仔细放置 Add/Done 调用,您可能需要另一个 goroutine 来执行等待(然后关闭其中一个通道以开始关闭)。工作人员不应该关闭他们的输出通道,因为有多个工作人员并且您不能多次关闭一个通道;队列 goroutine 在不知道工作人员何时完成的情况下无法判断何时关闭它与工作人员的通道。
过去,当我使用与上面非常相似的代码时,我在 "queue" goroutine 中使用了一个本地 "outstanding" 计数器(这避免了对互斥体的任何需要或任何同步开销一个等待组)。将工作发送给工人时,未完成工作的数量会增加。当工作人员说完成时,它又减少了。我的代码碰巧有另一个通道(我的 "queue" 除了要排队的其他节点外,还在收集结果)。它在自己的通道上可能更干净,但可以使用现有通道上的特殊值(例如 nil Job 指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要在列表为空时看到没有任何未完成的,就该终止了;只需关闭通往工作人员的频道和 return.
例如:
if len(list) > 0 {
send = toWorkers
item = list[0]
} else if outstandingJobs == 0 {
close(toWorkers)
return
}
我利用 Go 的互斥 (Mutex) 功能编写了一个解决方案。
当它以并发方式运行时,限制一次只有一个实例访问 url 映射可能很重要。我相信我按照下面的描述实现了它。请随时尝试一下。感谢您的反馈,因为我也会从您的评论中学习。
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// ! SafeUrlBook helps restrict only one instance access the central url map at a time. So that no redundant crawling should occur.
type SafeUrlBook struct {
book map[string]bool
mux sync.Mutex
}
func (sub *SafeUrlBook) doesThisExist(url string) bool {
sub.mux.Lock()
_ , key_exists := sub.book[url]
defer sub.mux.Unlock()
if key_exists {
return true
} else {
sub.book[url] = true
return false
}
}
// End SafeUrlBook
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
// Note that now I use safeBook (SafeUrlBook) to keep track of which url has been visited by a crawler.
func Crawl(url string, depth int, fetcher Fetcher, safeBook SafeUrlBook) {
if depth <= 0 {
return
}
exist := safeBook.doesThisExist(url)
if exist { fmt.Println("Skip", url) ; return }
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
Crawl(u, depth-1, fetcher, safeBook)
}
return
}
func main() {
safeBook := SafeUrlBook{book: make(map[string]bool)}
Crawl("https://golang.org/", 4, fetcher, safeBook)
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}
我正在尝试在 Go 中构建一个网络爬虫,我想在其中指定并发工作人员的最大数量。只要队列中有 link 待探索,它们都会工作。当队列中的元素少于工作人员时,工作人员应大声喊叫,但如果发现更多 link 则恢复。
我试过的密码是
const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
for element := range queue {
wg.Done() // why is defer here causing a deadlock?
fmt.Println("adding 2 new elements ")
if element%2 == 0 {
wg.Add(2)
queue <- (element*100 + 11)
queue <- (element*100 + 33)
}
}
}
func main() {
var wg sync.WaitGroup
queue := make(chan int, 10)
queue <- 0
queue <- 1
queue <- 2
queue <- 3
var min int
if (len(queue) < max_workers) {
min = len(queue)
} else {
min = max_workers
}
for i := 0; i < min; i++ {
wg.Add(1)
go crawl(&wg, queue)
}
wg.Wait()
close(queue)
}
这似乎可行,但有一个问题:开始时我必须用多个元素填充队列。我希望它从一个(单个)种子页面(在我的示例 queue <- 0
中)开始,然后动态地增长/缩小工作池。
我的问题是:
如何获得行为?
为什么 defer
wg.Done()
导致死锁?wg.Done()
函数真正完成时正常吗?我认为没有defer
goroutine 不会等待另一部分完成(在解析 HTML 的实际工作示例中可能需要更长的时间)。
如果您使用自己喜欢的网络搜索 "Go web crawler"(或 "golang web crawler") 你会发现很多例子,包括: Go Tour Exercise: Web Crawler。 在 Go 中也有一些关于并发的讨论涵盖了这种事情。
Go 中的 "standard" 方法完全不需要等待组。
要回答您的一个问题,使用 defer
排队的事物在函数 return 时只会得到 运行。你有一个很长的 运行ning 函数,所以不要在这样的循环中使用 defer
。
"standard" 方法是在自己的 goroutine 中启动任意数量的 worker。 他们都从同一个频道读"jobs",阻塞if/when无计可施。 完全完成后,该频道将关闭,他们都会退出。
在爬虫之类的情况下,工作人员会发现更多 "jobs" 要做的事情,并希望将他们排入队列。 你不希望他们写回同一个通道,因为它会有一些有限的缓冲(或 none!),你最终会阻止所有试图排队更多工作的工人!
一个简单的解决方案是使用单独的通道
(例如每个工人有 in <-chan Job, out chan<- Job
)
以及读取这些请求的单个 queue/filter goroutine,
将它们附加到一个切片上,它可以让其增长任意大或对其进行一些全局限制,
并且还从切片的头部馈送另一个通道
(即一个简单的 for-select 循环,从一个通道读取并写入另一个通道)。
此代码通常还负责跟踪已完成的操作
(例如访问过的 URL 的地图)并丢弃传入的重复请求。
队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):
type Job string
func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
var list []Job
done := make(map[Job]bool)
for {
var send chan<- Job
var item Job
if len(list) > 0 {
send = toWorkers
item = list[0]
}
select {
case send <- item:
// We sent an item, remove it
list = list[1:]
case thing := <-fromWorkers:
// Got a new thing
if !done[thing] {
list = append(list, thing)
done[thing] = true
}
}
}
}
在这个简单的示例中掩盖了一些事情。
比如终止。如果 "Jobs" 是一些更大的结构,您希望在其中使用 chan *Job
和 []*Job
。
在这种情况下,您还需要将地图类型更改为您从作业中提取的某个键
(例如 Job.URL
也许)
并且您想在 list = list[1:]
之前执行 list[0] = nil
以摆脱对 *Job
指针的引用,并让垃圾收集器更早地处理它。
编辑:关于完全终止的一些注意事项。
有几种方法可以干净地终止上述代码。可以使用等待组,但需要仔细放置 Add/Done 调用,您可能需要另一个 goroutine 来执行等待(然后关闭其中一个通道以开始关闭)。工作人员不应该关闭他们的输出通道,因为有多个工作人员并且您不能多次关闭一个通道;队列 goroutine 在不知道工作人员何时完成的情况下无法判断何时关闭它与工作人员的通道。
过去,当我使用与上面非常相似的代码时,我在 "queue" goroutine 中使用了一个本地 "outstanding" 计数器(这避免了对互斥体的任何需要或任何同步开销一个等待组)。将工作发送给工人时,未完成工作的数量会增加。当工作人员说完成时,它又减少了。我的代码碰巧有另一个通道(我的 "queue" 除了要排队的其他节点外,还在收集结果)。它在自己的通道上可能更干净,但可以使用现有通道上的特殊值(例如 nil Job 指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要在列表为空时看到没有任何未完成的,就该终止了;只需关闭通往工作人员的频道和 return.
例如:
if len(list) > 0 {
send = toWorkers
item = list[0]
} else if outstandingJobs == 0 {
close(toWorkers)
return
}
我利用 Go 的互斥 (Mutex) 功能编写了一个解决方案。
当它以并发方式运行时,限制一次只有一个实例访问 url 映射可能很重要。我相信我按照下面的描述实现了它。请随时尝试一下。感谢您的反馈,因为我也会从您的评论中学习。
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// ! SafeUrlBook helps restrict only one instance access the central url map at a time. So that no redundant crawling should occur.
type SafeUrlBook struct {
book map[string]bool
mux sync.Mutex
}
func (sub *SafeUrlBook) doesThisExist(url string) bool {
sub.mux.Lock()
_ , key_exists := sub.book[url]
defer sub.mux.Unlock()
if key_exists {
return true
} else {
sub.book[url] = true
return false
}
}
// End SafeUrlBook
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
// Note that now I use safeBook (SafeUrlBook) to keep track of which url has been visited by a crawler.
func Crawl(url string, depth int, fetcher Fetcher, safeBook SafeUrlBook) {
if depth <= 0 {
return
}
exist := safeBook.doesThisExist(url)
if exist { fmt.Println("Skip", url) ; return }
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
Crawl(u, depth-1, fetcher, safeBook)
}
return
}
func main() {
safeBook := SafeUrlBook{book: make(map[string]bool)}
Crawl("https://golang.org/", 4, fetcher, safeBook)
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}