如何从从该通道接收数据的 goroutine 添加一个对象到该通道?
How do I add an object to a channel from a goroutine that is receiving data from that channel?
基本上,我正在尝试使用 goroutines 编写并发站点地图爬虫。一个站点地图可以包含指向多个站点地图的链接,这些站点地图可以包含指向其他站点地图等的链接。
现在,这是我的设计:
worker:
- receives url from channel
- processesUrl(url)
processUrl:
for each link in lookup(url):
- if link is sitemap:
channel <- url
else:
print(url)
main:
- create 10 workers
- chanel <- root url
问题是 worker 在 processUrl() 完成之前不会从通道接受新的 url,而 processUrl 在 worker 从频道,如果它正在向频道添加 url。我可以使用什么并发设计将 url 添加到没有通道且没有忙等待或没有等待 channel <- url
的任务队列?
下面是实际代码,如果有帮助的话:
func (c *SitemapCrawler) worker() {
for {
select {
case url := <-urlChan:
fmt.Println(url)
c.crawlSitemap(url)
}
}
}
func crawlUrl(url string) {
defer crawlWg.Done()
crawler := NewCrawler(url)
for i := 0; i < MaxCrawlRate*20; i++ {
go crawler.worker()
}
crawler.getSitemaps()
pretty.Println(crawler.sitemaps)
crawler.crawlSitemaps()
}
func (c SitemapCrawler) crawlSitemap(url string) {
c.limiter.Take()
resp, err := MakeRequest(url)
if err != nil || resp.StatusCode != 200 {
crawlWg.Done()
return
}
var resp_txt []byte
if strings.Contains(resp.Header.Get("Content-Type"), "html") {
crawlWg.Done()
return
} else if strings.Contains(url, ".gz") || resp.Header.Get("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(resp.Body)
if err != nil {
crawlWg.Done()
panic(err)
} else {
resp_txt, err = ioutil.ReadAll(reader)
if err != nil {
crawlWg.Done()
panic(err)
}
}
reader.Close()
} else {
resp_txt, err = ioutil.ReadAll(resp.Body)
if err != nil {
//panic(err)
crawlWg.Done()
return
}
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
d, err := libxml2.ParseString(string(resp_txt))
if err != nil {
crawlWg.Done()
return
}
results, err := d.Find("//*[contains(local-name(), 'loc')]")
if err != nil {
crawlWg.Done()
return
}
locs := results.NodeList()
printLock.Lock()
for i := 0; i < len(locs); i++ {
newUrl := locs[i].TextContent()
if strings.Contains(newUrl, ".xml") {
crawlWg.Add(1)
//go c.crawlSitemap(newUrl)
urlChan <- newUrl
} else {
fmt.Println(newUrl)
}
}
printLock.Unlock()
crawlWg.Done()
}
当通道没有缓冲时,写入通道的操作正在阻塞。
创建缓冲通道:
urlChan := make(chan string, len(allUrls))
但是,当此通道已满时,写入操作将再次阻塞。
或者您可以使用开关。当写入 'fails' 时,它会立即下降到 default
select {
case urlChan <- url:
fmt.Println("received message")
default:
fmt.Println("no activity")
}
要在写入通道时超时,请执行以下操作
select {
case urlChan <- url:
fmt.Println("received message")
case <-time.After(5 * time.Second):
fmt.Println("timed out")
}
或者最后把write事件放在一个单独的go channel中
func write() {
urlChan <- url
}
go write()
基本上,我正在尝试使用 goroutines 编写并发站点地图爬虫。一个站点地图可以包含指向多个站点地图的链接,这些站点地图可以包含指向其他站点地图等的链接。
现在,这是我的设计:
worker:
- receives url from channel
- processesUrl(url)
processUrl:
for each link in lookup(url):
- if link is sitemap:
channel <- url
else:
print(url)
main:
- create 10 workers
- chanel <- root url
问题是 worker 在 processUrl() 完成之前不会从通道接受新的 url,而 processUrl 在 worker 从频道,如果它正在向频道添加 url。我可以使用什么并发设计将 url 添加到没有通道且没有忙等待或没有等待 channel <- url
的任务队列?
下面是实际代码,如果有帮助的话:
func (c *SitemapCrawler) worker() {
for {
select {
case url := <-urlChan:
fmt.Println(url)
c.crawlSitemap(url)
}
}
}
func crawlUrl(url string) {
defer crawlWg.Done()
crawler := NewCrawler(url)
for i := 0; i < MaxCrawlRate*20; i++ {
go crawler.worker()
}
crawler.getSitemaps()
pretty.Println(crawler.sitemaps)
crawler.crawlSitemaps()
}
func (c SitemapCrawler) crawlSitemap(url string) {
c.limiter.Take()
resp, err := MakeRequest(url)
if err != nil || resp.StatusCode != 200 {
crawlWg.Done()
return
}
var resp_txt []byte
if strings.Contains(resp.Header.Get("Content-Type"), "html") {
crawlWg.Done()
return
} else if strings.Contains(url, ".gz") || resp.Header.Get("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(resp.Body)
if err != nil {
crawlWg.Done()
panic(err)
} else {
resp_txt, err = ioutil.ReadAll(reader)
if err != nil {
crawlWg.Done()
panic(err)
}
}
reader.Close()
} else {
resp_txt, err = ioutil.ReadAll(resp.Body)
if err != nil {
//panic(err)
crawlWg.Done()
return
}
}
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
d, err := libxml2.ParseString(string(resp_txt))
if err != nil {
crawlWg.Done()
return
}
results, err := d.Find("//*[contains(local-name(), 'loc')]")
if err != nil {
crawlWg.Done()
return
}
locs := results.NodeList()
printLock.Lock()
for i := 0; i < len(locs); i++ {
newUrl := locs[i].TextContent()
if strings.Contains(newUrl, ".xml") {
crawlWg.Add(1)
//go c.crawlSitemap(newUrl)
urlChan <- newUrl
} else {
fmt.Println(newUrl)
}
}
printLock.Unlock()
crawlWg.Done()
}
当通道没有缓冲时,写入通道的操作正在阻塞。
创建缓冲通道:
urlChan := make(chan string, len(allUrls))
但是,当此通道已满时,写入操作将再次阻塞。
或者您可以使用开关。当写入 'fails' 时,它会立即下降到 default
select {
case urlChan <- url:
fmt.Println("received message")
default:
fmt.Println("no activity")
}
要在写入通道时超时,请执行以下操作
select {
case urlChan <- url:
fmt.Println("received message")
case <-time.After(5 * time.Second):
fmt.Println("timed out")
}
或者最后把write事件放在一个单独的go channel中
func write() {
urlChan <- url
}
go write()