使用 Goroutines 同时加载大型 CSV 时的未定义行为

Undefined behaviour while loading a large CSV concurrently using Goroutines

我正在尝试使用 Golang 使用 goroutines 加载一个大的 CSV 文件。 csv 的维度是 (254882, 100)。但是当我解析 csv 并将其存储到 2D 列表中时使用我的 goroutines,我得到的行小于 254882 并且每个 运行 的行数都不同。我觉得这是由于 goroutines 而发生的,但似乎无法指出原因。谁能帮帮我吗。我也是 Golang 的新手。下面是我的代码

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

我的主要功能如下所示

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

如果有人也需要下载 CSV,请单击下面的 link (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing

Data Race Detector


您的结果未定义,因为您存在数据竞争。

~/gopath/src$ go run -race racer.go
==================
WARNING: DATA RACE
Write at 0x00c00008a060 by goroutine 6:
  runtime.mapassign_faststr()
      /home/peter/go/src/runtime/map_faststr.go:202 +0x0
  main.main.func2()
      /home/peter/gopath/src/racer.go:16 +0x6a

Previous write at 0x00c00008a060 by goroutine 5:
  runtime.mapassign_faststr()
      /home/peter/go/src/runtime/map_faststr.go:202 +0x0
  main.main.func1()
      /home/peter/gopath/src/racer.go:11 +0x6a

Goroutine 6 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:14 +0x88

Goroutine 5 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:9 +0x5b
==================
fatal error: concurrent map writes
==================
WARNING: DATA RACE
Write at 0x00c00009a088 by goroutine 6:
  main.main.func2()
      /home/peter/gopath/src/racer.go:16 +0x7f

Previous write at 0x00c00009a088 by goroutine 5:
  main.main.func1()
      /home/peter/gopath/src/racer.go:11 +0x7f

Goroutine 6 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:14 +0x88

Goroutine 5 (running) created at:
  main.main()
      /home/peter/gopath/src/racer.go:9 +0x5b
==================

goroutine 34 [running]:
runtime.throw(0x49e156, 0x15)
    /home/peter/go/src/runtime/panic.go:608 +0x72 fp=0xc000094718 sp=0xc0000946e8 pc=0x44b342
runtime.mapassign_faststr(0x48ace0, 0xc00008a060, 0x49c9c3, 0x8, 0xc00009a088)
    /home/peter/go/src/runtime/map_faststr.go:211 +0x46c fp=0xc000094790 sp=0xc000094718 pc=0x43598c
main.main.func1(0x49c9c3, 0x8)
    /home/peter/gopath/src/racer.go:11 +0x6b fp=0xc0000947d0 sp=0xc000094790 pc=0x47ac6b
runtime.goexit()
    /home/peter/go/src/runtime/asm_amd64.s:1340 +0x1 fp=0xc0000947d8 sp=0xc0000947d0 pc=0x473061
created by main.main
    /home/peter/gopath/src/racer.go:9 +0x5c

goroutine 1 [sleep]:
time.Sleep(0x5f5e100)
    /home/peter/go/src/runtime/time.go:105 +0x14a
main.main()
    /home/peter/gopath/src/racer.go:19 +0x96

goroutine 35 [runnable]:
main.main.func2(0x49c9c3, 0x8)
    /home/peter/gopath/src/racer.go:16 +0x6b
created by main.main
    /home/peter/gopath/src/racer.go:14 +0x89
exit status 2
~/gopath/src$ 

racer.go:

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "os"
    "strconv"
    "sync"
    "time"
)

func loadCSV(csvFile string) (*[][]float64, error) {
    startTime := time.Now()
    var dataset [][]float64
    f, err := os.Open(csvFile)
    if err != nil {
        return &dataset, err
    }
    r := csv.NewReader(bufio.NewReader(f))
    counter := 0
    var wg sync.WaitGroup
    for {
        record, err := r.Read()
        if err == io.EOF {
            break
        }
        if counter != 0 {
            wg.Add(1)
            go func(r []string, dataset *[][]float64) {
                var temp []float64
                for _, each := range record {
                    f, err := strconv.ParseFloat(each, 64)
                    if err == nil {
                        temp = append(temp, f)
                    }
                }
                *dataset = append(*dataset, temp)
                wg.Done()
            }(record, &dataset)
        }
        counter++
    }
    wg.Wait()
    duration := time.Now().Sub(startTime)
    log.Printf("Loaded %d rows in %v seconds", counter, duration)
    return &dataset, nil
}

func main() {
    // runtime.GOMAXPROCS(4)
    dataset, err := loadCSV("/home/peter/AvgW2V_train.csv")
    if err != nil {
        panic(err)
    }
    fmt.Println(len(*dataset))
}

没有必要使用 *[][]float64 因为那将是一个双指针。

我对你的程序做了一些小的修改。

dataset 可用于新的 goroutine,因为它是在代码块上方声明的。

同样record也是可用的,但是由于record变量,会不时发生变化,我们需要将它传递给新的goroutine。

虽然不需要传递 dataset,因为它没有改变,而这正是我们想要的,因此我们可以将 temp 附加到 dataset

但是当多个 goroutines 试图附加到同一个变量时,就会发生竞争条件,即多个 goroutines 试图写入同一个变量。

所以我们需要确保在任何时候只有一个 can goroutine 可以添加。 所以我们使用锁来进行顺序追加。

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "os"
    "strconv"
    "sync"
)

func loadCSV(csvFile string) [][]float64 {
    var dataset [][]float64

    f, _ := os.Open(csvFile)

    r := csv.NewReader(f)

    var wg sync.WaitGroup
    l := new(sync.Mutex) // lock

    for record, err := r.Read(); err == nil; record, err = r.Read() {
        wg.Add(1)

        go func(record []string) {
            defer wg.Done()

            var temp []float64
            for _, each := range record {
                if f, err := strconv.ParseFloat(each, 64); err == nil {
                    temp = append(temp, f)
                }
            }
            l.Lock() // lock before writing
            dataset = append(dataset, temp) // write
            l.Unlock() // unlock

        }(record)
    }

    wg.Wait()

    return dataset
}

func main() {
    dataset := loadCSV("train.csv")
    fmt.Println(len(dataset))
}

有些错误没有得到处理以使其最小化,但您应该处理错误。