如何有效地转置 67 gb file/Dask 数据帧而不将其完全加载到内存中?

How can I efficiently transpose a 67 gb file/Dask dataframe without loading it entirely into memory?

我有 3 个相当大的文件(67gb、36gb、30gb)需要用来训练模型。但是,特征是行,样本是列。由于 Dask 尚未实现转置并按行拆分存储数据帧,因此我需要自己编写一些东西来执行此操作。有没有一种方法可以高效转置而不加载到内存中?

我有 16 GB 的内存可供使用,并且正在使用 jupyter notebook。我写了一些相当慢的代码,但真的很感激一个更快的解决方案。下面代码的速度,需要一个月的时间才能完成所有的文件。最慢几个数量级的步骤是awk。

import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
    for i in range(1, len(df.columns)+1):
        print('AWKing')
        #read a column from the original data and store it elsewhere
        x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
        subprocess.check_call([x], shell=True)

        print('Reading')
        #load and transpose the column
        col = pd.read_csv('~/file.temp')
        row = col.T
        display(row)

        print('Deleting')
        #remove the temporary file created
        !rm ../file.temp

        print('Storing')
        #store the row in its own csv just to be safe. not entirely necessary
        row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

        print('Appending')
        #append the row (transposed column) to the new file
        with open('~/columns/col_{:09d}', 'rb') as fin:
            for line in fin:
                fout.write(line)

        clear_output()
        #Just a measure of progress
        print(i/len(df.columns))

数据本身是 1000 万行(特征)和 2000 列(样本)。它只需要转置。目前,它看起来像这样:

我会创建一个中间文件并使用 fp.seek 以新顺序以二进制格式写入它们,然后再将其转换回新的 CSV。 给定行,列变成列,行 - sys.float_info 会给你每个元素的大小,每个元素的位置((是列 * old_row_length + 行)* float 的大小)。

然后将它们重新组合成 CSV,方法是将它们转换回文本并每行读入 old_count_rows。

我修改了我的原始脚本以部署在任意数量的 cpu 上。它工作得更快,因为我可以使用多个线程并部署在 aws 上。我用的是96核的机器,大概8个小时就完成了任务。我很惊讶,因为这几乎是线性缩放!这个想法是使一些重复的任务可分配。然后你就可以给cpus分配任务了。这里的并行化是通过命令 pool.map().

完成的

从命令行使用这个脚本非常简单:

python3 transposer.py -i largeFile.tsv

如果需要,您也可以指定其他参数。

import argparse, subprocess
import numpy as np
import pandas as pd
import dask.dataframe as dd
from IPython.display import clear_output
from contextlib import closing
from os import cpu_count
from multiprocessing import Pool

parser = argparse.ArgumentParser(description='Transpose csv')
parser.add_argument('-i', '--infile', help='Path to input folder',
                    default=None)
parser.add_argument('-s', '--sep', help='input separator',
                    default='\t')

args = parser.parse_args()
infile = args.infile
sep = args.sep    
df = pd.read_csv(infile, sep='\t', nrows=3)    

def READ_COL(item):
    print(item)
    outfile = 'outfile{}.temp'.format(item)
    if item !=0:
                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                subprocess.check_call([x], shell=True)
                col = pd.read_csv(outfile)
                row = col.T
                display(row)
                row.to_csv('col_{:09d}.csv'.format(item), header=False)
                subprocess.check_call(['rm '+outfile], shell=True)
                print(item/len(df.columns))

with closing(Pool(processes=cpu_count())) as pool:
    pool.map(READ_COL, list(range(1, len(df.columns)+1)))

在此之后,您应该有一些文件是 t运行 列。您只需要使用 cat 或其他一些命令行工具将它们连接在一起。我只是运行cat col_* > full_file_transposed.csv