您可以使用 Dask DataFrame 在 dask.delayed 中查找 table 吗?
Can you use Dask DataFrame as lookup table in dask.delayed?
我的数据规模很大,DataFrame 合并不太可能成功——之前的尝试导致过度的数据混洗、调度程序内存不足错误以及工作进程中的通信超时,即使有索引,分区、大量工作人员、总内存等
我在“手动”合并方面取得了一些成功,方法是将数据写入小文件并在需要查找时将它们读回。我们目前正在 dask.delayed 函数中执行此操作。这显然需要大量磁盘 I/O.
Dask delayed best practices (https://docs.dask.org/en/latest/delayed-best-practices.html)警告不要将DataFrame发送到delayed,提到不要调用delayed form delayed,并告诉我们在分布式场景中避免全局状态。这些最佳实践让我相信没有一种安全的方法可以从延迟函数中使用 DataFrame——我的理解是否正确?
不幸的是,数据的规模和敏感性使其难以作为工作示例在此处共享,但请考虑 20+gb 的查找 table(在较小的一侧)加入 65+gb table(在非常小的一边)。它们单独在 Dask DataFrame 分布式内存中工作没有问题。我们的处理需要一个列上的索引,而合并需要一个单独的索引(强制大洗牌和重新分区)。
是否有不同的方法来合并我可能遗漏的大型 DataFrame?
一般来说很难说,因为最佳过程将取决于数据特征,但一些选项是:
手动拆分:如您所述,这就是我可能会做的,除非我可能不使用 delayed
加载数据;
手动索引:如果您合并的数据具有某种结构,例如时间或特定 categories/order,那么您可以通过生成额外的查找来改进手动拆分 table (在文件级别),例如如果文件A、B、C包含X、Y、Z,但是如果你还想要W,那么你需要加载文件D;
数据库:使用带索引的数据库(也可以通过dask-sql
查询)
从延迟构建查找 table 的问题在于,如果不计算延迟,dask 将不知道延迟对象内部的内容。您可以通过使用您的数据知识构建更复杂的对象来帮助 dask,例如通过构建延迟对象的字典,您可以减少 dask 需要完成的工作量(数据传输、内存加载等)。这是一个粗略的伪代码(但这不是推荐,只是一种可能性):
files = {
'A' : list_of_files_containing_A,
'B': list_of_files_containing_B,
# more values
}
@delayed
del load_table(file_list):
df = pd.concat(pd.read_csv(f) for f in file_list)
# some processing
return df
lookup = {k: load_table(v) for k, v in files.items()}
# further downstream, when you want to load 'A' objects, you would refer to lookup['A']
# or in general for some value in variable lookup_value
# lookup[lookup_value]
# this could reduce the workload for dask by constraining
# the amount of data to check
虽然不确定上述方法的效率如何,但使用可用资源更像是一种技巧。
我的数据规模很大,DataFrame 合并不太可能成功——之前的尝试导致过度的数据混洗、调度程序内存不足错误以及工作进程中的通信超时,即使有索引,分区、大量工作人员、总内存等
我在“手动”合并方面取得了一些成功,方法是将数据写入小文件并在需要查找时将它们读回。我们目前正在 dask.delayed 函数中执行此操作。这显然需要大量磁盘 I/O.
Dask delayed best practices (https://docs.dask.org/en/latest/delayed-best-practices.html)警告不要将DataFrame发送到delayed,提到不要调用delayed form delayed,并告诉我们在分布式场景中避免全局状态。这些最佳实践让我相信没有一种安全的方法可以从延迟函数中使用 DataFrame——我的理解是否正确?
不幸的是,数据的规模和敏感性使其难以作为工作示例在此处共享,但请考虑 20+gb 的查找 table(在较小的一侧)加入 65+gb table(在非常小的一边)。它们单独在 Dask DataFrame 分布式内存中工作没有问题。我们的处理需要一个列上的索引,而合并需要一个单独的索引(强制大洗牌和重新分区)。
是否有不同的方法来合并我可能遗漏的大型 DataFrame?
一般来说很难说,因为最佳过程将取决于数据特征,但一些选项是:
手动拆分:如您所述,这就是我可能会做的,除非我可能不使用
delayed
加载数据;手动索引:如果您合并的数据具有某种结构,例如时间或特定 categories/order,那么您可以通过生成额外的查找来改进手动拆分 table (在文件级别),例如如果文件A、B、C包含X、Y、Z,但是如果你还想要W,那么你需要加载文件D;
数据库:使用带索引的数据库(也可以通过
dask-sql
查询)
从延迟构建查找 table 的问题在于,如果不计算延迟,dask 将不知道延迟对象内部的内容。您可以通过使用您的数据知识构建更复杂的对象来帮助 dask,例如通过构建延迟对象的字典,您可以减少 dask 需要完成的工作量(数据传输、内存加载等)。这是一个粗略的伪代码(但这不是推荐,只是一种可能性):
files = {
'A' : list_of_files_containing_A,
'B': list_of_files_containing_B,
# more values
}
@delayed
del load_table(file_list):
df = pd.concat(pd.read_csv(f) for f in file_list)
# some processing
return df
lookup = {k: load_table(v) for k, v in files.items()}
# further downstream, when you want to load 'A' objects, you would refer to lookup['A']
# or in general for some value in variable lookup_value
# lookup[lookup_value]
# this could reduce the workload for dask by constraining
# the amount of data to check
虽然不确定上述方法的效率如何,但使用可用资源更像是一种技巧。