并发逐行读取文件

Reading a file line-by-line with concurrency

我想做什么

GetLine 中,我尝试使用 bufio.Scanner 逐行解析文件,并尝试进行并发处理。 在获取每一行中的文本之后,我通过 string 的通道将其发送给调用者(main 函数)。除了值,我还发送错误和完成标志(通过 done 通道)。因此,这应该能够在处理当前行时获取新行以在单独的 goroutine 中处理。

我实际做了什么

var READCOMPLETE = errors.New("Completed Reading")

func main() {

    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)
    done := make(chan bool)

    go GetLine(*filename, names, readerr, done)

    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            log.Fatal(err)

        case <-done:
            // close(names)
            // close(readerr)
            break
        }
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error, done chan bool) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
        //fmt.Println(scanner.Text())
    }

    if err := scanner.Err(); err != nil {
        readerr <- err
    }

    done <- true
}

我得到了什么 运行

运行时错误:fatal error: all goroutines are asleep - deadlock!

我尝试修复了什么?

阅读 this 关于错误消息的回答后,我尝试在 select 语句的最后一个子句中关闭通道 namesreaderr,如注释。但是,该程序仍然崩溃并显示一条日志消息。我无法进一步修复它,非常感谢任何帮助。
欢迎提供学习资源。

P.S:我对 GoLang 比较陌生,仍在学习如何使用 Go 中的 CSP 并发模型。事实上,这是我第一次尝试编写同步并发程序。

select 中的 break 语句跳出 select。应用程序必须在完成后跳出 for 循环。使用标签跳出 for 循环:

loop:
    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            log.Fatal(err)

        case <-done:
            // close(names)
            // close(readerr)
            break loop
        }
    }

可以通过删除 done 通道来简化代码。

func main() {

    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)

    go GetLine(*filename, names, readerr)

loop:
    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            if err != nil {
                log.Fatal(err)
            }
            break loop
        }
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
    }
    readerr <- scanner.Err()
}

在这个具体的例子中,可以重组代码以将接收名称与接收错误分开。

func main() {
    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)

    go GetLine(*filename, names, readerr)

    for name := range names {
        fmt.Println(name)
    }
    if err := <-readerr; err != nil {
        log.Fatal(err)
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
    }
    close(names) // close causes range on channel to break out of loop
    readerr <- scanner.Err()
}