通过使用 scanner.Scan() 从文件中读取数据来同步可组合 goroutine 时出现问题
Problem synchronizing composable goroutines by reading data from file with scanner.Scan()
我正在通过功能组合而不是 120+LOC 构建一个过滤管道,所有这些都在一个方法中
因为我以后可能会重用这个管道的某些部分。
我无法让它按预期工作。我怀疑函数 readValuesFromFile
在 scanner.Scan()
将值放入 inputStream
通道之前退出(即
该方法的主要 goroutine 在 (1) goroutine 之前退出。
如果我将 scanner.Scan()
替换为在频道中放入一些随机字符串
整个管道按预期工作。
这是问题所在还是我遗漏了什么?
如何优雅地解决这个问题?
谢谢!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream + validate&filter + creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
函数 readValuesFromFile
保证在第一个值发送到 inputStream
之前 return。在发送方和接收方准备就绪之前,无缓冲通道 inputStream
上的通信不会成功。 inputStream
直到 readValuesFromFile
returns 之后才接收到,因此 goroutine 的发送要到 readValuesFromFile
returns 之后才会成功。
当函数readValuesFromFile
returns时,defer语句关闭扫描器使用的文件。有可能扫描器在文件关闭之前缓冲了一些数据,但也有可能扫描器没有读取任何数据。
通过从 goroutine 关闭文件来修复。
来自扫描仪的错误 return 描述了问题。始终处理错误。
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}
我正在通过功能组合而不是 120+LOC 构建一个过滤管道,所有这些都在一个方法中 因为我以后可能会重用这个管道的某些部分。
我无法让它按预期工作。我怀疑函数 readValuesFromFile
在 scanner.Scan()
将值放入 inputStream
通道之前退出(即
该方法的主要 goroutine 在 (1) goroutine 之前退出。
如果我将 scanner.Scan()
替换为在频道中放入一些随机字符串
整个管道按预期工作。
这是问题所在还是我遗漏了什么?
如何优雅地解决这个问题?
谢谢!
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
defer file.Close()
inputStream := make(chan string)
go func() { //(1)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() { // (2)
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
return inputStream
}
func validateValues(inputStream <-chan string) <-chan string {
//read from the input stream + validate&filter + creating and putting values in an output stream
}
func writeResults(validStream <-chan string) {
//read from the validated stream and write data to file
}
func main() {
valueStream := readValuesFromFile("myfile.txt")
validatedStream := validateValues(valueStream)
writeResults(validatedStream)
}
函数 readValuesFromFile
保证在第一个值发送到 inputStream
之前 return。在发送方和接收方准备就绪之前,无缓冲通道 inputStream
上的通信不会成功。 inputStream
直到 readValuesFromFile
returns 之后才接收到,因此 goroutine 的发送要到 readValuesFromFile
returns 之后才会成功。
当函数readValuesFromFile
returns时,defer语句关闭扫描器使用的文件。有可能扫描器在文件关闭之前缓冲了一些数据,但也有可能扫描器没有读取任何数据。
通过从 goroutine 关闭文件来修复。
来自扫描仪的错误 return 描述了问题。始终处理错误。
func readValuesFromFile(filename string) <-chan string {
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
inputStream := make(chan string)
go func() {
defer file.Close()
defer close(inputStream)
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
if scanner.Err() != nil {
// Handle error as appropriate for your application.
log.Print("scan error", err)
}
}()
return inputStream
}