如何处理大型但不是大数据的数据集?
How to handle large yet not big-data datasets?
我有一个约 200gb 的数据集,其中包含大约 15 亿个观测值,我需要对其进行 运行 一些条件分析和数据聚合*。
问题是我不习惯(也没有接受过处理)大型数据集。我通常在 R 或 Python 上工作(旁边还有一些 Julia),当我无法将数据集放入内存时,我完全迷失了方向。
人们如何处理这些适合磁盘但不适合内存的数据集?我应该从哪里开始寻找解决方案?是否有一个地方可以集中有关大型但非大型数据集的信息?
*长话短说,我有另一个数据集(适合内存),对于这个小数据集的每一行,我想计算大数据集中与小数据集中的某些条件相匹配的观测值的数量。我最初的反应是 运行 块中的代码,但这是非常低效的,并且需要几个世纪的单处理器计算时间。
既然已经被具体问到,我就描述一下我的文件结构。
我有一个大文件,我们称它为大文件,有(特别是)两个 ID 变量,$ID0$ 和 $ID1$ 和日期变量 $date1$.
我有一个小文件,我们称它为 SMALL,有两个 ID 变量,$ID2$ 和 $ID3$,和一个日期变量 $date2$.
对于每个 $ID2_i$,我想计算所有观察结果
$\{ID0 = ID2_i, date1
我可能误解了你的问题,但对大文件进行分块(正如评论中所建议的那样)在我看来是最直接的方法。
假设您将 200 GB 的文件分成 100 个块,然后迭代这些块并对每个块进行所需的计数,然后聚合结果。如果每块操作在几分钟内运行,你应该没问题,除非你想一遍又一遍地这样做。
要获得更具体的建议,我需要更多地了解数据存储格式。我们是在谈论一个大 .csv
文件吗?在这种情况下,对于 R,您可能会查看 chunked API of the readr
package。为了在 R 中尽快再次进行计数,data.table
包可能会派上用场。
编辑:添加一些示例代码
这不会完全按照您的要求进行,但希望涵盖一些关键点,以便按照我建议的方式制定解决方案。
图书馆(data.table)
图书馆(读者)
IDs <- seq.int(1, 1e2)
日期 <- seq(as.Date("1999/01/01"), as.Date("2000/01/01"), by = "day")
大 <- data.table(id0 = 样本 (ids, 1e6, replace = TRUE),
id1 = 样本 (ids, 1e6, replace = TRUE),
date1 = 样本(日期,1e6,replace = TRUE))
write.csv(大,"big.csv",row.names = 假)
小 <- data.table(id2 = 样本 (ids, 1e2),
id3 = 样本(ids,1e2),
date2 = 样本(日期,1e2))
count_fun <- 函数(x, pos, acc) {
设置DT(x)
tmp <- small[x, 列表(counts = .N),
on = c("id2 == id0", "id3 == id1", "date2 > date1"),
by = .EACHI, nomatch = NULL]
acc[tmp<span class="math-container">$id2] <- acc[tmp$</span>id2] + tmp$counts
acc
}
累加器 <- AccumulateCallback$new(count_fun, acc = rep(0, length(ids)))
计数 <- read_csv_chunked("big.csv", 累加器, chunk_size = 1e4)
有不同的方法
分块数据集(节省未来时间但需要初始时间投入)
分块使您可以简化许多操作,例如洗牌等。
确保每个 subset/chunk 都代表整个数据集。每个块文件应具有相同的行数。
这可以通过一个接一个地向一个文件追加一行来完成。很快,您会意识到打开每个文件并写入一行是低效的。尤其是在同一驱动器上读写时。
-> 添加适合内存的写入和读取缓冲区。
选择适合您需要的块大小。我选择这个特定大小是因为我的默认文本编辑器仍然可以相当快地打开它。
较小的块可以提高性能,特别是如果你想获得 class 分布等指标,因为你只需要循环遍历一个代表性文件以获得对整个数据集的估计,这可能就足够了。
较大的块文件确实可以更好地表示每个文件中的整个数据集,但您也可以只浏览 x 个较小的块文件。
我确实为此使用了 c#,因为我在这方面更有经验,因此我可以使用完整的功能集,例如将任务 reading / processing / writing
拆分到不同的线程上。
如果您有使用 python 或 r 的经验,我想应该也有类似的功能。并行化可能是此类大型数据集的一个重要因素。
分块数据集可以建模为一个交错数据集,您可以使用张量处理单元对其进行处理。这可能会产生最佳性能之一,并且可以在本地以及真正大型机器上的云中执行。但这需要大量学习tensorflow
使用reader并逐步读取文件
你不想做类似 all_of_it = file.read()
的事情,而是想使用某种流reader。以下函数逐行读取其中一个块文件(或整个 300gb 数据集)以计算文件中的每个 class。通过一次处理一行,您的程序将不会溢出内存。
您可能想要添加一些进度指示,例如 X lines/s 或 X MBbs,以便估计总处理时间。
def getClassDistribution(path):
classes = dict()
# open sample file and count classes
with open(path, "r",encoding="utf-8",errors='ignore') as f:
line = f.readline()
while line:
if line != '':
labelstring = line[-2:-1]
if labelstring == ',':
labelstring = line[-1:]
label = int(labelstring)
if label in classes:
classes[label] += 1
else:
classes[label] = 1
line = f.readline()
return classes
我结合使用了分块数据集和估计。
性能陷阱
尽可能避免嵌套循环。另一个循环内的每个循环将复杂度乘以 n
尽可能一次处理数据。每个循环都增加了 n 的复杂度
- 如果您的数据采用 csv 格式,请避免使用
cells = int(line.Split(',')[8])
等预制函数,这会很快导致内存吞吐量瓶颈。可以在 getClassDistribution
中找到一个合适的例子,我只想获取标签。
以下 C# 函数超快地将 csv 行拆分为元素。
// Call function
ThreadPool.QueueUserWorkItem((c) => AnalyzeLine("05.02.2020,12.20,10.13").Wait());
// Parralelize this on multiple cores/threads for ultimate performance
private async Task AnalyzeLine(string line)
{
PriceElement elementToAdd = new PriceElement();
int counter = 0;
string temp = "";
foreach (char c in line)
{
if (c == ',')
{
switch (counter)
{
case 0:
elementToAdd.spotTime = DateTime.Parse(temp, CultureInfo.InvariantCulture);
break;
case 1:
elementToAdd.buyPrice = decimal.Parse(temp);
break;
case 2:
elementToAdd.sellPrice = decimal.Parse(temp);
break;
}
temp = "";
counter++;
}
else temp += c;
}
// compare the price element to conditions on another thread
Observate(elementToAdd);
}
创建数据库并加载数据
在处理类似 csv 的数据时,您可以将数据加载到数据库中。
数据库是为容纳大量数据而设计的,您可以期待非常高的性能。
与原始数据相比,数据库可能会在您的磁盘上使用更多 space。这就是我放弃使用数据库的原因之一。
硬件优化
如果您的代码优化得很好,您的瓶颈很可能是硬盘吞吐量。
- 如果数据适合您的本地硬盘驱动器,请在本地使用它,因为这将消除网络延迟(想象一下,本地网络中每条记录有 2-5 毫秒,远程位置有 10-100 毫秒)。
- 使用现代硬盘。一个 1tb NVME SSD 今天的成本约为 130(intel 600p 1tb)。 nvme ssd 使用 pcie,比普通 ssd 快 5 倍左右,比普通硬盘驱动器快 50 倍,尤其是在快速写入不同位置(分块数据)时。近年来,SSD 的容量大幅增加,对于这样的任务来说,这将是野蛮的。
以下屏幕截图提供了在同一台机器上使用相同数据进行 TensorFlow 训练的性能比较。一次保存在本地标准 ssd 上,一次保存在本地网络中的网络附加存储(普通硬盘)上。
看起来像一个 O(n^2) 问题:BIG 中的每个元素都必须与 BIG 中的所有其他元素进行比较。
也许您可以将比较所需的所有字段放入内存中(将其余部分留在文件中)。
例如:1.5G observations x 1 date (4 bytes) x 2 IDs (8 bytes) 可以装进 18GB.
也许你可以按日期对 BIG 进行排序,然后你的问题就变成了 O(n x log(n))。
也许你可以将 BIG 分成 ID3i = ID3j 的块。
有很多可能性。
我有一个约 200gb 的数据集,其中包含大约 15 亿个观测值,我需要对其进行 运行 一些条件分析和数据聚合*。
问题是我不习惯(也没有接受过处理)大型数据集。我通常在 R 或 Python 上工作(旁边还有一些 Julia),当我无法将数据集放入内存时,我完全迷失了方向。
人们如何处理这些适合磁盘但不适合内存的数据集?我应该从哪里开始寻找解决方案?是否有一个地方可以集中有关大型但非大型数据集的信息?
*长话短说,我有另一个数据集(适合内存),对于这个小数据集的每一行,我想计算大数据集中与小数据集中的某些条件相匹配的观测值的数量。我最初的反应是 运行 块中的代码,但这是非常低效的,并且需要几个世纪的单处理器计算时间。
既然已经被具体问到,我就描述一下我的文件结构。
我有一个大文件,我们称它为大文件,有(特别是)两个 ID 变量,$ID0$ 和 $ID1$ 和日期变量 $date1$.
我有一个小文件,我们称它为 SMALL,有两个 ID 变量,$ID2$ 和 $ID3$,和一个日期变量 $date2$.
对于每个 $ID2_i$,我想计算所有观察结果
$\{ID0 = ID2_i, date1
我可能误解了你的问题,但对大文件进行分块(正如评论中所建议的那样)在我看来是最直接的方法。
假设您将 200 GB 的文件分成 100 个块,然后迭代这些块并对每个块进行所需的计数,然后聚合结果。如果每块操作在几分钟内运行,你应该没问题,除非你想一遍又一遍地这样做。
要获得更具体的建议,我需要更多地了解数据存储格式。我们是在谈论一个大 .csv
文件吗?在这种情况下,对于 R,您可能会查看 chunked API of the readr
package。为了在 R 中尽快再次进行计数,data.table
包可能会派上用场。
编辑:添加一些示例代码
这不会完全按照您的要求进行,但希望涵盖一些关键点,以便按照我建议的方式制定解决方案。
图书馆(data.table)
图书馆(读者)
IDs <- seq.int(1, 1e2)
日期 <- seq(as.Date("1999/01/01"), as.Date("2000/01/01"), by = "day")
大 <- data.table(id0 = 样本 (ids, 1e6, replace = TRUE),
id1 = 样本 (ids, 1e6, replace = TRUE),
date1 = 样本(日期,1e6,replace = TRUE))
write.csv(大,"big.csv",row.names = 假)
小 <- data.table(id2 = 样本 (ids, 1e2),
id3 = 样本(ids,1e2),
date2 = 样本(日期,1e2))
count_fun <- 函数(x, pos, acc) {
设置DT(x)
tmp <- small[x, 列表(counts = .N),
on = c("id2 == id0", "id3 == id1", "date2 > date1"),
by = .EACHI, nomatch = NULL]
acc[tmp<span class="math-container">$id2] <- acc[tmp$</span>id2] + tmp$counts
acc
}
累加器 <- AccumulateCallback$new(count_fun, acc = rep(0, length(ids)))
计数 <- read_csv_chunked("big.csv", 累加器, chunk_size = 1e4)
有不同的方法
分块数据集(节省未来时间但需要初始时间投入)
分块使您可以简化许多操作,例如洗牌等。
确保每个 subset/chunk 都代表整个数据集。每个块文件应具有相同的行数。
这可以通过一个接一个地向一个文件追加一行来完成。很快,您会意识到打开每个文件并写入一行是低效的。尤其是在同一驱动器上读写时。
-> 添加适合内存的写入和读取缓冲区。
选择适合您需要的块大小。我选择这个特定大小是因为我的默认文本编辑器仍然可以相当快地打开它。
较小的块可以提高性能,特别是如果你想获得 class 分布等指标,因为你只需要循环遍历一个代表性文件以获得对整个数据集的估计,这可能就足够了。
较大的块文件确实可以更好地表示每个文件中的整个数据集,但您也可以只浏览 x 个较小的块文件。
我确实为此使用了 c#,因为我在这方面更有经验,因此我可以使用完整的功能集,例如将任务 reading / processing / writing
拆分到不同的线程上。
如果您有使用 python 或 r 的经验,我想应该也有类似的功能。并行化可能是此类大型数据集的一个重要因素。
分块数据集可以建模为一个交错数据集,您可以使用张量处理单元对其进行处理。这可能会产生最佳性能之一,并且可以在本地以及真正大型机器上的云中执行。但这需要大量学习tensorflow
使用reader并逐步读取文件
你不想做类似 all_of_it = file.read()
的事情,而是想使用某种流reader。以下函数逐行读取其中一个块文件(或整个 300gb 数据集)以计算文件中的每个 class。通过一次处理一行,您的程序将不会溢出内存。
您可能想要添加一些进度指示,例如 X lines/s 或 X MBbs,以便估计总处理时间。
def getClassDistribution(path):
classes = dict()
# open sample file and count classes
with open(path, "r",encoding="utf-8",errors='ignore') as f:
line = f.readline()
while line:
if line != '':
labelstring = line[-2:-1]
if labelstring == ',':
labelstring = line[-1:]
label = int(labelstring)
if label in classes:
classes[label] += 1
else:
classes[label] = 1
line = f.readline()
return classes
我结合使用了分块数据集和估计。
性能陷阱
尽可能避免嵌套循环。另一个循环内的每个循环将复杂度乘以 n尽可能一次处理数据。每个循环都增加了 n 的复杂度
- 如果您的数据采用 csv 格式,请避免使用
cells = int(line.Split(',')[8])
等预制函数,这会很快导致内存吞吐量瓶颈。可以在getClassDistribution
中找到一个合适的例子,我只想获取标签。
以下 C# 函数超快地将 csv 行拆分为元素。
// Call function
ThreadPool.QueueUserWorkItem((c) => AnalyzeLine("05.02.2020,12.20,10.13").Wait());
// Parralelize this on multiple cores/threads for ultimate performance
private async Task AnalyzeLine(string line)
{
PriceElement elementToAdd = new PriceElement();
int counter = 0;
string temp = "";
foreach (char c in line)
{
if (c == ',')
{
switch (counter)
{
case 0:
elementToAdd.spotTime = DateTime.Parse(temp, CultureInfo.InvariantCulture);
break;
case 1:
elementToAdd.buyPrice = decimal.Parse(temp);
break;
case 2:
elementToAdd.sellPrice = decimal.Parse(temp);
break;
}
temp = "";
counter++;
}
else temp += c;
}
// compare the price element to conditions on another thread
Observate(elementToAdd);
}
创建数据库并加载数据
在处理类似 csv 的数据时,您可以将数据加载到数据库中。
数据库是为容纳大量数据而设计的,您可以期待非常高的性能。
与原始数据相比,数据库可能会在您的磁盘上使用更多 space。这就是我放弃使用数据库的原因之一。
硬件优化
如果您的代码优化得很好,您的瓶颈很可能是硬盘吞吐量。
- 如果数据适合您的本地硬盘驱动器,请在本地使用它,因为这将消除网络延迟(想象一下,本地网络中每条记录有 2-5 毫秒,远程位置有 10-100 毫秒)。
- 使用现代硬盘。一个 1tb NVME SSD 今天的成本约为 130(intel 600p 1tb)。 nvme ssd 使用 pcie,比普通 ssd 快 5 倍左右,比普通硬盘驱动器快 50 倍,尤其是在快速写入不同位置(分块数据)时。近年来,SSD 的容量大幅增加,对于这样的任务来说,这将是野蛮的。
以下屏幕截图提供了在同一台机器上使用相同数据进行 TensorFlow 训练的性能比较。一次保存在本地标准 ssd 上,一次保存在本地网络中的网络附加存储(普通硬盘)上。
看起来像一个 O(n^2) 问题:BIG 中的每个元素都必须与 BIG 中的所有其他元素进行比较。
也许您可以将比较所需的所有字段放入内存中(将其余部分留在文件中)。 例如:1.5G observations x 1 date (4 bytes) x 2 IDs (8 bytes) 可以装进 18GB.
也许你可以按日期对 BIG 进行排序,然后你的问题就变成了 O(n x log(n))。
也许你可以将 BIG 分成 ID3i = ID3j 的块。
有很多可能性。