在 70 GB csv 文件中分块和计算总和、最小值、最大值的最快方法
Fastest way to chunk and calculate sum, min, max in a 70 GB csv file
我有一个 70 GB 的 csv 文件 (data.csv
),其结构如下 Timestamp,Price,Volume
。
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
我可以像这样使用 python.
逐行阅读
with open("data.csv") as infile:
for line in infile:
row = line.split(",")
current_price = row[1]
如果价格变动 +10
或 -10
,那么我想拍摄该块的快照并计算这些价格的开盘价、收盘价、最高价和最低价。音量也为 volume_sum
。在上面的示例中,价格从 100 移动到 90。
open
是在块中找到的第一个 价格。在上面的示例中,它是 100
close
是在块中找到的 last 价格。在上面的示例中,它是 90
high
是在块中找到的 max(price)。在上面的示例中,它是 109
low
是在块中找到的 min(price)。在上面的示例中,它是 90
volume_sum
是在块中找到的 sum(volume)。在上面的示例中,它是 700
这些开盘价、收盘价、最高价、最低价,volume_sum成为新纪录。新块将从 close
价格开始衡量,即 90。即如果价格达到 100 或达到 80,那将成为一个新块。
我在 Ubuntu。因为,我有一个非常大的文件,我正在寻找一种更有效的方法来做到这一点。 [我不想为此使用 pandas。它对我的用例来说很慢]
有人可以指导我吗?
谢谢
您无能为力,因为您需要阅读所有行。 Python I/O 比以前更好,但它不匹配 C 或 Rust 或 AWK 等低级语言。
您可以做的一件事是使用更大的内存块,您可能还想尝试使用字节。也许您可以找到一种方法将作业分成较小的块并使用异步或 mp。
反正 70GB 的文件很大。
正如其他人所说,如果要执行此操作,请使用数据库或其他一些工具(如 Spark)。 70GB 很多。话虽如此,您可以遍历行,跟踪字典缓冲区中的相关值,然后在找到相关差异时写入并清空缓冲区。您应该将所有这些打包在一个 class 中,但其要点如下:
def convert(tup):
tup = tup.split(",")
return tup[0], int(tup[1]), int(tup[2])
def write_buffer(buffer, close_price, min_price, max_price, close_stamp, volume_sum):
buffer["close_price"] = close_price
buffer["min_price"] = min_price
buffer["max_price"] = max_price
buffer["close_stamp"] = close_stamp
buffer["volume_sum"] = volume_sum
def buffer_to_file(buffer, path_to_file):
# Ensure insertion order of keys
keys = [
"open_stamp",
"open",
"close_price",
"min_price",
"max_price",
"volume_sum",
"close_stamp",
]
with open(path_to_file, "w") as f:
for k in keys[:-1]:
f.write(f"{buffer[k]}, ")
f.write(f"{buffer[keys[-1]]}") # do not write the last val with a comma
f.write("\n") # new line for each buffer write
in_path = "read-large-file.txt"
out_path = "out.txt"
def compute_stats(in_path, out_path):
with open(in_path) as f:
time, open_p, vol = convert(f.readline())
min_p, max_p, volume_sum = open_p, open_p, vol
buffer = {"open_stamp": time, "open": open_p}
for line in f:
t, p, v = convert(line)
volume_sum += v
if p > max_p:
max_p = p
if p < min_p:
min_p = p
if abs(p - open_p) >= 10:
write_buffer(buffer, p, min_p, max_p, t, volume_sum)
buffer_to_file(buffer, out_path)
# Reset buffer and values
min_p, max_p, volume_sum = p, p, v
buffer = {"open_stamp": t, "open": p}
compute_stats(in_path, out_path)
假设:
- 第一个块从第一行中找到的价格开始
- 切换块时,相应'switch'行的体积不计入新块
- 虽然 question/description 中没有提到,我们也会为每个区块提供 open/close 次
- 输出格式:
openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
- 当我们到达文件末尾时,无论我们是否超过
+/- 10
阈值,继续打印最后一个块(并假设最后一个块中至少有 2 个数据行)
在输入中再添加几行:
$ cat data.csv
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
1649289600190,83,90
1649289600288,95,60
1649289600388,79,35
1649289600488,83,100
一个awk
想法:
awk -v mvDN=-10 -v mvUP=10 '
function print_stats() {
if (openTime && openTime != closeTime)
print openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
openTime=closeTime
openPrc=highPrc=lowPrc=closePrc
volume=( NR==1 ? currVol : 0 )
}
BEGIN { FS=OFS="," }
{ closeTime=
currPrc=
currVol=
closePrc=currPrc
volume+=currVol
highPrc=(currPrc > highPrc ? currPrc : highPrc)
lowPrc= (currPrc < lowPrc ? currPrc : lowPrc)
if ( (currPrc - openPrc <= mvDN) || (currPrc - openPrc >= mvUP) )
print_stats()
}
END { print_stats() }
' data.csv
这会生成:
1649289600174,1649289600188,100,90,109,90,700
1649289600188,1649289600388,90,79,95,79,185
1649289600388,1649289600488,79,83,83,79,100
我有一个 70 GB 的 csv 文件 (data.csv
),其结构如下 Timestamp,Price,Volume
。
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
我可以像这样使用 python.
逐行阅读with open("data.csv") as infile:
for line in infile:
row = line.split(",")
current_price = row[1]
如果价格变动 +10
或 -10
,那么我想拍摄该块的快照并计算这些价格的开盘价、收盘价、最高价和最低价。音量也为 volume_sum
。在上面的示例中,价格从 100 移动到 90。
open
是在块中找到的第一个 价格。在上面的示例中,它是100
close
是在块中找到的 last 价格。在上面的示例中,它是90
high
是在块中找到的 max(price)。在上面的示例中,它是109
low
是在块中找到的 min(price)。在上面的示例中,它是90
volume_sum
是在块中找到的 sum(volume)。在上面的示例中,它是700
这些开盘价、收盘价、最高价、最低价,volume_sum成为新纪录。新块将从 close
价格开始衡量,即 90。即如果价格达到 100 或达到 80,那将成为一个新块。
我在 Ubuntu。因为,我有一个非常大的文件,我正在寻找一种更有效的方法来做到这一点。 [我不想为此使用 pandas。它对我的用例来说很慢]
有人可以指导我吗?
谢谢
您无能为力,因为您需要阅读所有行。 Python I/O 比以前更好,但它不匹配 C 或 Rust 或 AWK 等低级语言。
您可以做的一件事是使用更大的内存块,您可能还想尝试使用字节。也许您可以找到一种方法将作业分成较小的块并使用异步或 mp。
反正 70GB 的文件很大。
正如其他人所说,如果要执行此操作,请使用数据库或其他一些工具(如 Spark)。 70GB 很多。话虽如此,您可以遍历行,跟踪字典缓冲区中的相关值,然后在找到相关差异时写入并清空缓冲区。您应该将所有这些打包在一个 class 中,但其要点如下:
def convert(tup):
tup = tup.split(",")
return tup[0], int(tup[1]), int(tup[2])
def write_buffer(buffer, close_price, min_price, max_price, close_stamp, volume_sum):
buffer["close_price"] = close_price
buffer["min_price"] = min_price
buffer["max_price"] = max_price
buffer["close_stamp"] = close_stamp
buffer["volume_sum"] = volume_sum
def buffer_to_file(buffer, path_to_file):
# Ensure insertion order of keys
keys = [
"open_stamp",
"open",
"close_price",
"min_price",
"max_price",
"volume_sum",
"close_stamp",
]
with open(path_to_file, "w") as f:
for k in keys[:-1]:
f.write(f"{buffer[k]}, ")
f.write(f"{buffer[keys[-1]]}") # do not write the last val with a comma
f.write("\n") # new line for each buffer write
in_path = "read-large-file.txt"
out_path = "out.txt"
def compute_stats(in_path, out_path):
with open(in_path) as f:
time, open_p, vol = convert(f.readline())
min_p, max_p, volume_sum = open_p, open_p, vol
buffer = {"open_stamp": time, "open": open_p}
for line in f:
t, p, v = convert(line)
volume_sum += v
if p > max_p:
max_p = p
if p < min_p:
min_p = p
if abs(p - open_p) >= 10:
write_buffer(buffer, p, min_p, max_p, t, volume_sum)
buffer_to_file(buffer, out_path)
# Reset buffer and values
min_p, max_p, volume_sum = p, p, v
buffer = {"open_stamp": t, "open": p}
compute_stats(in_path, out_path)
假设:
- 第一个块从第一行中找到的价格开始
- 切换块时,相应'switch'行的体积不计入新块
- 虽然 question/description 中没有提到,我们也会为每个区块提供 open/close 次
- 输出格式:
openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
- 当我们到达文件末尾时,无论我们是否超过
+/- 10
阈值,继续打印最后一个块(并假设最后一个块中至少有 2 个数据行)
在输入中再添加几行:
$ cat data.csv
1649289600174,100,100
1649289600176,105,100
1649289600178,109,100
1649289600179,104,100
1649289600181,101,100
1649289600182,100,100
1649289600188,90,100
1649289600190,83,90
1649289600288,95,60
1649289600388,79,35
1649289600488,83,100
一个awk
想法:
awk -v mvDN=-10 -v mvUP=10 '
function print_stats() {
if (openTime && openTime != closeTime)
print openTime,closeTime,openPrc,closePrc,highPrc,lowPrc,volume
openTime=closeTime
openPrc=highPrc=lowPrc=closePrc
volume=( NR==1 ? currVol : 0 )
}
BEGIN { FS=OFS="," }
{ closeTime=
currPrc=
currVol=
closePrc=currPrc
volume+=currVol
highPrc=(currPrc > highPrc ? currPrc : highPrc)
lowPrc= (currPrc < lowPrc ? currPrc : lowPrc)
if ( (currPrc - openPrc <= mvDN) || (currPrc - openPrc >= mvUP) )
print_stats()
}
END { print_stats() }
' data.csv
这会生成:
1649289600174,1649289600188,100,90,109,90,700
1649289600188,1649289600388,90,79,95,79,185
1649289600388,1649289600488,79,83,83,79,100