通过使用 scanner.Scan() 从文件中读取数据来同步可组合 goroutine 时出现问题

Problem synchronizing composable goroutines by reading data from file with scanner.Scan()

我正在通过功能组合而不是 120+LOC 构建一个过滤管道,所有这些都在一个方法中 因为我以后可能会重用这个管道的某些部分。

我无法让它按预期工作。我怀疑函数 readValuesFromFilescanner.Scan() 将值放入 inputStream 通道之前退出(即 该方法的主要 goroutine 在 (1) goroutine 之前退出。

如果我将 scanner.Scan() 替换为在频道中放入一些随机字符串 整个管道按预期工作。

这是问题所在还是我遗漏了什么?

如何优雅地解决这个问题?

谢谢!

func readValuesFromFile(filename string) <-chan string {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    inputStream := make(chan string)

    go func() { //(1)
        count := 0
        scanner := bufio.NewScanner(file)
        for scanner.Scan() { // (2)
            inputStream <- strings.TrimSpace(scanner.Text())
            count = count + 1
        }
        
        close(inputStream)
    }()
    return inputStream
}

func validateValues(inputStream <-chan string) <-chan string {
    //read from the input stream + validate&filter + creating and putting values in an output stream
}

func writeResults(validStream <-chan string) {
    //read from the validated stream and write data to file
}

func main() {
    valueStream := readValuesFromFile("myfile.txt")
    validatedStream := validateValues(valueStream)
    
    writeResults(validatedStream)
}

函数 readValuesFromFile 保证在第一个值发送到 inputStream 之前 return。在发送方和接收方准备就绪之前,无缓冲通道 inputStream 上的通信不会成功。 inputStream 直到 readValuesFromFile returns 之后才接收到,因此 goroutine 的发送要到 readValuesFromFile returns 之后才会成功。

当函数readValuesFromFile returns时,defer语句关闭扫描器使用的文件。有可能扫描器在文件关闭之前缓冲了一些数据,但也有可能扫描器没有读取任何数据。

通过从 goroutine 关闭文件来修复。

来自扫描仪的错误 return 描述了问题。始终处理错误。

func readValuesFromFile(filename string) <-chan string {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }

    inputStream := make(chan string)

    go func() {
        defer file.Close()
        defer close(inputStream)
        count := 0
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            inputStream <- strings.TrimSpace(scanner.Text())
            count = count + 1
        }
        if scanner.Err() != nil {
            // Handle error as appropriate for your application.
            log.Print("scan error", err)
        }
    }()

    return inputStream
}