并发逐行读取文件
Reading a file line-by-line with concurrency
我想做什么
在 GetLine
中,我尝试使用 bufio.Scanner
逐行解析文件,并尝试进行并发处理。
在获取每一行中的文本之后,我通过 string
的通道将其发送给调用者(main
函数)。除了值,我还发送错误和完成标志(通过 done
通道)。因此,这应该能够在处理当前行时获取新行以在单独的 goroutine 中处理。
我实际做了什么
var READCOMPLETE = errors.New("Completed Reading")
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
done := make(chan bool)
go GetLine(*filename, names, readerr, done)
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
log.Fatal(err)
case <-done:
// close(names)
// close(readerr)
break
}
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error, done chan bool) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
//fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
readerr <- err
}
done <- true
}
我得到了什么 运行
运行时错误:fatal error: all goroutines are asleep - deadlock!
我尝试修复了什么?
阅读 this 关于错误消息的回答后,我尝试在 select
语句的最后一个子句中关闭通道 names
和 readerr
,如注释。但是,该程序仍然崩溃并显示一条日志消息。我无法进一步修复它,非常感谢任何帮助。
欢迎提供学习资源。
P.S:我对 GoLang 比较陌生,仍在学习如何使用 Go 中的 CSP 并发模型。事实上,这是我第一次尝试编写同步并发程序。
select 中的 break 语句跳出 select。应用程序必须在完成后跳出 for 循环。使用标签跳出 for 循环:
loop:
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
log.Fatal(err)
case <-done:
// close(names)
// close(readerr)
break loop
}
}
可以通过删除 done 通道来简化代码。
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
go GetLine(*filename, names, readerr)
loop:
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
if err != nil {
log.Fatal(err)
}
break loop
}
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
}
readerr <- scanner.Err()
}
在这个具体的例子中,可以重组代码以将接收名称与接收错误分开。
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
go GetLine(*filename, names, readerr)
for name := range names {
fmt.Println(name)
}
if err := <-readerr; err != nil {
log.Fatal(err)
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
}
close(names) // close causes range on channel to break out of loop
readerr <- scanner.Err()
}
我想做什么
在 GetLine
中,我尝试使用 bufio.Scanner
逐行解析文件,并尝试进行并发处理。
在获取每一行中的文本之后,我通过 string
的通道将其发送给调用者(main
函数)。除了值,我还发送错误和完成标志(通过 done
通道)。因此,这应该能够在处理当前行时获取新行以在单独的 goroutine 中处理。
我实际做了什么
var READCOMPLETE = errors.New("Completed Reading")
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
done := make(chan bool)
go GetLine(*filename, names, readerr, done)
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
log.Fatal(err)
case <-done:
// close(names)
// close(readerr)
break
}
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error, done chan bool) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
//fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
readerr <- err
}
done <- true
}
我得到了什么 运行
运行时错误:fatal error: all goroutines are asleep - deadlock!
我尝试修复了什么?
阅读 this 关于错误消息的回答后,我尝试在 select
语句的最后一个子句中关闭通道 names
和 readerr
,如注释。但是,该程序仍然崩溃并显示一条日志消息。我无法进一步修复它,非常感谢任何帮助。
欢迎提供学习资源。
P.S:我对 GoLang 比较陌生,仍在学习如何使用 Go 中的 CSP 并发模型。事实上,这是我第一次尝试编写同步并发程序。
select 中的 break 语句跳出 select。应用程序必须在完成后跳出 for 循环。使用标签跳出 for 循环:
loop:
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
log.Fatal(err)
case <-done:
// close(names)
// close(readerr)
break loop
}
}
可以通过删除 done 通道来简化代码。
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
go GetLine(*filename, names, readerr)
loop:
for {
select {
case name := <-names:
// Process each line
fmt.Println(name)
case err := <-readerr:
if err != nil {
log.Fatal(err)
}
break loop
}
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
}
readerr <- scanner.Err()
}
在这个具体的例子中,可以重组代码以将接收名称与接收错误分开。
func main() {
filename := flag.String("filename", "", "The file to parse")
flag.Parse()
if *filename == "" {
log.Fatal("Provide a file to parse")
}
fmt.Println("Getting file")
names := make(chan string)
readerr := make(chan error)
go GetLine(*filename, names, readerr)
for name := range names {
fmt.Println(name)
}
if err := <-readerr; err != nil {
log.Fatal(err)
}
fmt.Println("Processing Complete")
}
func GetLine(filename string, names chan string, readerr chan error) {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
names <- scanner.Text()
}
close(names) // close causes range on channel to break out of loop
readerr <- scanner.Err()
}