我想将一个文件拆分成大小相等 "chunks" 或切片,并使用 goroutines 同时处理它们
I want to split a file into equally sized "chunks", or slices and use goroutines to process them simultaneously
使用 Go,我有很大的日志文件。目前我打开它们,创建一个新的扫描仪 bufio.NewScanner
,然后 for scanner.Scan()
循环遍历这些行。每行都通过一个处理函数发送,该函数将其与正则表达式匹配并提取数据。我想使用 goroutines 同时以块的形式处理这个文件。我相信这可能比按顺序循环遍历整个文件更快。
每个文件可能需要几秒钟,我想知道我是否可以一次处理一个文件,比如 10 个文件。我相信如果需要我可以牺牲记忆。我有 ~3gb,最大的日志文件可能是 75mb。
我看到 scanner
有一个 .Split()
方法,您可以在其中提供自定义拆分功能,但我无法使用此方法找到好的解决方案。
我也试过创建一个切片的切片,使用 scanner.Scan()
循环遍历扫描仪并将 scanner.Text()
附加到每个切片。
例如:
// pseudocode because I couldn't get this to work either
scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]
i := 0
for scanner.Scan() {
i = i + 1
if i > 5 {
i = 0
}
threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)
我是 Go 新手,很关心效率和性能。我想学习如何编写好的 Go 代码!非常感谢任何帮助或建议。
如果在第 N 行之前处理第 N+1 行是可以接受的,您可以使用简单的扇出模式开始。 The Go blog 解释了这个和更高级的模式,例如取消和扇入。
请注意,这只是一个起点,以使其简单明了。例如,您几乎肯定希望在退出之前等待 process
函数到 return。这在提到的博客 post.
中有解释
package main
import "bufio"
func main() {
var sc *bufio.Scanner
lines := make(chan string)
go process(lines)
go process(lines)
go process(lines)
go process(lines)
for sc.Scan() {
lines <- sc.Text()
}
close(lines)
}
func process(lines <-chan string) {
for line := range lines {
// implement processing here
}
}
Peter 提供了一个很好的起点,如果你想做类似扇出、扇入模式的事情,你可以这样做:
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
func main() {
file, err := os.Open("/path/to/file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
lines := make(chan string)
// start four workers to do the heavy lifting
wc1 := startWorker(lines)
wc2 := startWorker(lines)
wc3 := startWorker(lines)
wc4 := startWorker(lines)
scanner := bufio.NewScanner(file)
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
merged := merge(wc1, wc2, wc3, wc4)
for line := range merged {
fmt.Println(line)
}
}
func startWorker(lines <-chan string) <-chan string {
finished := make(chan string)
go func() {
defer close(finished)
for line := range lines {
// Do your heavy work here
finished <- line
}
}()
return finished
}
func merge(cs ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
使用 Go,我有很大的日志文件。目前我打开它们,创建一个新的扫描仪 bufio.NewScanner
,然后 for scanner.Scan()
循环遍历这些行。每行都通过一个处理函数发送,该函数将其与正则表达式匹配并提取数据。我想使用 goroutines 同时以块的形式处理这个文件。我相信这可能比按顺序循环遍历整个文件更快。
每个文件可能需要几秒钟,我想知道我是否可以一次处理一个文件,比如 10 个文件。我相信如果需要我可以牺牲记忆。我有 ~3gb,最大的日志文件可能是 75mb。
我看到 scanner
有一个 .Split()
方法,您可以在其中提供自定义拆分功能,但我无法使用此方法找到好的解决方案。
我也试过创建一个切片的切片,使用 scanner.Scan()
循环遍历扫描仪并将 scanner.Text()
附加到每个切片。
例如:
// pseudocode because I couldn't get this to work either
scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]
i := 0
for scanner.Scan() {
i = i + 1
if i > 5 {
i = 0
}
threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)
我是 Go 新手,很关心效率和性能。我想学习如何编写好的 Go 代码!非常感谢任何帮助或建议。
如果在第 N 行之前处理第 N+1 行是可以接受的,您可以使用简单的扇出模式开始。 The Go blog 解释了这个和更高级的模式,例如取消和扇入。
请注意,这只是一个起点,以使其简单明了。例如,您几乎肯定希望在退出之前等待 process
函数到 return。这在提到的博客 post.
package main
import "bufio"
func main() {
var sc *bufio.Scanner
lines := make(chan string)
go process(lines)
go process(lines)
go process(lines)
go process(lines)
for sc.Scan() {
lines <- sc.Text()
}
close(lines)
}
func process(lines <-chan string) {
for line := range lines {
// implement processing here
}
}
Peter 提供了一个很好的起点,如果你想做类似扇出、扇入模式的事情,你可以这样做:
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
func main() {
file, err := os.Open("/path/to/file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
lines := make(chan string)
// start four workers to do the heavy lifting
wc1 := startWorker(lines)
wc2 := startWorker(lines)
wc3 := startWorker(lines)
wc4 := startWorker(lines)
scanner := bufio.NewScanner(file)
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
merged := merge(wc1, wc2, wc3, wc4)
for line := range merged {
fmt.Println(line)
}
}
func startWorker(lines <-chan string) <-chan string {
finished := make(chan string)
go func() {
defer close(finished)
for line := range lines {
// Do your heavy work here
finished <- line
}
}()
return finished
}
func merge(cs ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}