在 70 GB csv 文件中分块和计算总和、最小值、最大值的最快方法

Fastest way to chunk and calculate sum, min, max in a 70 GB csv file

我有一个 70 GB 的 csv 文件 (data.csv),其结构如下 Timestamp,Price,Volume

1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100

我可以像这样使用 python.

逐行阅读
with open("data.csv") as infile:
    for line in infile:
        row = line.split(",")
        current_price = row[1]

如果价格变动 +10-10,那么我想拍摄该块的快照并计算这些价格的开盘价、收盘价、最高价和最低价。音量也为 volume_sum。在上面的示例中,价格从 100 移动到 90。

这些开盘价、收盘价、最高价、最低价,volume_sum成为新纪录。新块将从 close 价格开始衡量,即 90。即如果价格达到 100 或达到 80,那将成为一个新块。

我在 Ubuntu。因为,我有一个非常大的文件,我正在寻找一种更有效的方法来做到这一点。 [我不想为此使用 pandas。它对我的用例来说很慢]

有人可以指导我吗?

谢谢

您无能为力,因为您需要阅读所有行。 Python I/O 比以前更好,但它不匹配 C 或 Rust 或 AWK 等低级语言。

您可以做的一件事是使用更大的内存块,您可能还想尝试使用字节。也许您可以找到一种方法将作业分成较小的块并使用异步或 mp。

反正 70GB 的文件很大。

正如其他人所说,如果要执行此操作,请使用数据库或其他一些工具(如 Spark)。 70GB 很多。话虽如此,您可以遍历行,跟踪字典缓冲区中的相关值,然后在找到相关差异时写入并清空缓冲区。您应该将所有这些打包在一个 class 中,但其要点如下:

def convert(tup):
    tup = tup.split(",")
    return tup[0], int(tup[1]), int(tup[2])


def write_buffer(buffer, close_price, min_price, max_price, close_stamp, volume_sum):
    buffer["close_price"] = close_price
    buffer["min_price"] = min_price
    buffer["max_price"] = max_price
    buffer["close_stamp"] = close_stamp
    buffer["volume_sum"] = volume_sum


def buffer_to_file(buffer, path_to_file):
    # Ensure insertion order of keys
    keys = [
        "open_stamp",
        "open",
        "close_price",
        "min_price",
        "max_price",
        "volume_sum",
        "close_stamp",
    ]
    with open(path_to_file, "w") as f:
        for k in keys[:-1]:
            f.write(f"{buffer[k]}, ")
        f.write(f"{buffer[keys[-1]]}")  # do not write the last val with a comma
        f.write("\n")  # new line for each buffer write


in_path = "read-large-file.txt"
out_path = "out.txt"


def compute_stats(in_path, out_path):
    with open(in_path) as f:
        time, open_p, vol = convert(f.readline())
        min_p, max_p, volume_sum = open_p, open_p, vol
        buffer = {"open_stamp": time, "open": open_p}
        for line in f:
            t, p, v = convert(line)
            volume_sum += v
            if p > max_p:
                max_p = p
            if p < min_p:
                min_p = p
            if abs(p - open_p) >= 10:
                write_buffer(buffer, p, min_p, max_p, t, volume_sum)
                buffer_to_file(buffer, out_path)
                # Reset buffer and values
                min_p, max_p, volume_sum = p, p, v
                buffer = {"open_stamp": t, "open": p}


compute_stats(in_path, out_path)

假设:

  • 第一个块从第一行中找到的价格开始
  • 切换块时,相应'switch'行的体积不计入新块
  • 虽然 question/description 中没有提到,我们也会为每个区块提供 open/close 次
  • 输出格式:openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
  • 当我们到达文件末尾时,无论我们是否超过 +/- 10 阈值,继续打印最后一个块(并假设最后一个块中至少有 2 个数据行)

在输入中再添加几行:

$ cat data.csv
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
1649289600190,83,90
1649289600288,95,60
1649289600388,79,35
1649289600488,83,100

一个awk想法:

awk -v mvDN=-10 -v mvUP=10 '

function print_stats() {
    if (openTime && openTime != closeTime)
       print openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume

    openTime=closeTime
    openPrc=highPrc=lowPrc=closePrc
    volume=( NR==1 ? currVol : 0 )
}

BEGIN { FS=OFS="," }
      { closeTime=
        currPrc=
        currVol=

        closePrc=currPrc
        volume+=currVol
        highPrc=(currPrc > highPrc ? currPrc : highPrc)
        lowPrc= (currPrc < lowPrc  ? currPrc : lowPrc)

        if ( (currPrc - openPrc <= mvDN) || (currPrc - openPrc >= mvUP) )
           print_stats()
      }
END   { print_stats() }
' data.csv

这会生成:

1649289600174,1649289600188,100,90,109,90,700
1649289600188,1649289600388,90,79,95,79,185
1649289600388,1649289600488,79,83,83,79,100