方法执行期间内存消耗高

High memory consumption during method execution

对于一个项目,我想为 CSV 的大约 5000 万行中的每一行手动创建结构。为此,我逐行遍历文件并将每个结构附加到一个切片中。这是简化的方法:

func readCSV(filePath string) DataFrame {
    file, _ := os.Open(filePath)
    defer file.Close()
    var rows []Row
    scanner := bufio.NewScanner(file)
    scanner.Scan()
    for scanner.Scan() {
        parts := strings.Split(scanner.Text(), ",")
        if len(parts) < 7 {
            continue
        }
        column1, _ := strconv.Atoi(parts[0])
        column2, _ := strconv.ParseFloat(parts[1], 32)
        column3, _ := strconv.ParseFloat(parts[2], 32)
        column4 := parts[3]
        column5, _ := strconv.ParseFloat(parts[4], 32)
        column6 := parts[5]
        column7 := parts[6]
        row := Row{
            Column1: column1,
            Column2: column2,
            Column3: column3,
            Column4: column4,
            Column5: column5,
            Column6: column6,
            Column7: column7,
        }
        rows = append(rows, row)
    }
    return DataFrame{
        Rows: rows,
    }
}

生成的 DataFrame 有大约 3 GB 的内存。问题是 RAM 消耗在方法执行期间达到顶峰,Go 进程使用 15GB+ 内存,使该函数无法用于我的目的。切片返回后,进程的 RAM 消耗下降到预期的 3GB。

堆配置文件如下所示:

    3.26GB     5.81GB (flat, cum)   100% of Total
         .          .     62:   scanner := bufio.NewScanner(file)
         .          .     63:   scanner.Scan()
         .          .     64:   for scanner.Scan() {
         .     2.55GB     65:           parts := strings.Split(scanner.Text(), ",")
         .          .     66:           if len(parts) < 7 {
         .          .     67:                   continue
         .          .     68:           }
         .          .     69:           column1, _ := strconv.Atoi(parts[0])
         .          .     70:           column2, _ := strconv.ParseFloat(parts[1], 32)
         .          .     71:           column3, _ := strconv.ParseFloat(parts[2], 32)
         .          .     72:           column4 := parts[3]
         .          .     73:           column5, _ := strconv.ParseFloat(parts[4], 32)
         .          .     74:           column6 := parts[5]
         .          .     75:           column7 := parts[6]
         .          .     76:           row := Row{
         .          .     77:                   Column1: column1,
         .          .     78:                   Column2: column2,
         .          .     79:                   Column3: column3,
         .          .     80:                   Column4: column4,
         .          .     81:                   Column5: column5,
         .          .     82:                   Column6: column6,
         .          .     83:                   Column7: column7,
         .          .     84:           }
    3.26GB     3.26GB     85:           rows = append(rows, row)
         .          .     86:   }
         .          .     87:
         .          .     88:   return DataFrame{
         .          .     89:           Rows: rows,

我不知道高内存消耗是从哪里来的。我试图手动调用垃圾收集器但没有成功。谁能给我提示?

rows 是 Row 结构的数组,而不是指针。每行花费 32 个字节用于浮点数和整数,加上字符串的长度。 5000 万行可能会变得相当大。更糟糕的是,append 将增长 rows 大约 1.5 倍,因此它最终会分配大量额外内存,同时也会丢弃许多需要进行垃圾收集的较小版本。然后 append(rows, row) 是一个副本,意味着更多的分配和释放。而且它必须等待垃圾回收,这会增加内存使用量。

这可以通过存储引用来避免。这应该意味着更少的分配并使 rows 显着变小。

var rows []*Row
...
rows = append(rows, &row)

然而,真正的问题是一下子把所有东西都吞进去。这是去!我们可以使用 channels and goroutines 在处理的同时同时读取一行。

CSV 具有欺骗性。 Go 已经有一个 CSV 库,encoding/csv,所以我们将使用它。

# A handy function to make ignoring errors a bit less laborious.
func IgnoreError(value interface{}, err error) interface{} {
    return value
}

# Its more flexible to take an io.Reader.
# It returns a channel of individual rows.
func readCSV(input io.Reader) chan Row {
    rows := make(chan Row)
    go func() {
        defer close(rows)

        # Use encoding/csv.
        # Let it reuse its backing array for each row.
        # Ignore rows with the wrong number of columns.
        reader := csv.NewReader(input)
        reader.FieldsPerRecord = 7
        reader.ReuseRecord = true

        for {
            parts, err := reader.Read()

            if err == io.EOF {
                break
            }
            if err != nil {
                continue
            }

            # Send each row down the channel.
            rows <- Row{
                Column1: IgnoreError(strconv.Atoi(parts[0])).(int),
                Column2: IgnoreError(strconv.ParseFloat(parts[1], 32)).(float64),
                Column3: IgnoreError(strconv.ParseFloat(parts[2], 32)).(float64),
                Column4: parts[3],
                Column5: IgnoreError(strconv.ParseFloat(parts[4], 32)).(float64),
                Column6: parts[5],
                Column7: parts[6],
            }
        }
    }();
    
    return rows;
}

func main() {
    file, err := os.Open("test.csv")
    if err != nil {
        log.Fatal(err)
    }
    
    rows := readCSV(file)
    for row := range rows {
        fmt.Println(row)
    }
}

现在一次只加载一行。内存使用量应保持不变。