我想将一个文件拆分成大小相等 "chunks" 或切片,并使用 goroutines 同时处理它们

I want to split a file into equally sized "chunks", or slices and use goroutines to process them simultaneously

使用 Go,我有很大的日志文件。目前我打开它们,创建一个新的扫描仪 bufio.NewScanner,然后 for scanner.Scan() 循环遍历这些行。每行都通过一个处理函数发送,该函数将其与正则表达式匹配并提取数据。我想使用 goroutines 同时以块的形式处理这个文件。我相信这可能比按顺序循环遍历整个文件更快。

每个文件可能需要几秒钟,我想知道我是否可以一次处理一个文件,比如 10 个文件。我相信如果需要我可以牺牲记忆。我有 ~3gb,最大的日志文件可能是 75mb。

我看到 scanner 有一个 .Split() 方法,您可以在其中提供自定义拆分功能,但我无法使用此方法找到好的解决方案。

我也试过创建一个切片的切片,使用 scanner.Scan() 循环遍历扫描仪并将 scanner.Text() 附加到每个切片。 例如:

// pseudocode because I couldn't get this to work either

scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]

i := 0
for scanner.Scan() {
    i = i + 1
    if i > 5 {
        i = 0
    }
    threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)

我是 Go 新手,很关心效率和性能。我想学习如何编写好的 Go 代码!非常感谢任何帮助或建议。

如果在第 N 行之前处理第 N+1 行是可以接受的,您可以使用简单的扇出模式开始。 The Go blog 解释了这个和更高级的模式,例如取消和扇入。

请注意,这只是一个起点,以使其简单明了。例如,您几乎肯定希望在退出之前等待 process 函数到 return。这在提到的博客 post.

中有解释
package main

import "bufio"

func main() {
    var sc *bufio.Scanner

    lines := make(chan string)

    go process(lines)
    go process(lines)
    go process(lines)
    go process(lines)

    for sc.Scan() {
            lines <- sc.Text()
    }

    close(lines)
}

func process(lines <-chan string) {
    for line := range lines {
            // implement processing here
    }
}

Peter 提供了一个很好的起点,如果你想做类似扇出、扇入模式的事情,你可以这样做:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

func main() {
    file, err := os.Open("/path/to/file.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    lines := make(chan string)
    // start four workers to do the heavy lifting
    wc1 := startWorker(lines)
    wc2 := startWorker(lines)
    wc3 := startWorker(lines)
    wc4 := startWorker(lines)
    scanner := bufio.NewScanner(file)
    go func() {
        defer close(lines)
        for scanner.Scan() {
            lines <- scanner.Text()
        }

        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()

    merged := merge(wc1, wc2, wc3, wc4)
    for line := range merged {
        fmt.Println(line)
    }
}

func startWorker(lines <-chan string) <-chan string {
    finished := make(chan string)
    go func() {
        defer close(finished)
        for line := range lines {
            // Do your heavy work here
            finished <- line
        }
    }()
    return finished
}

func merge(cs ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan string) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}