dask 核外矩阵乘法调度
dask out-of-core matrix multiply scheduling
我正在尝试为大小为 10,000 * 800,000 的矩阵 X 计算矩阵乘积 Y=XX^T。矩阵 X 存储在磁盘上的 h5py 文件中。生成的 Y 应该是存储在同一个 h5py 文件中的 10,000*10,000 矩阵。这是一个可重现的示例代码。
import dask.array as da
from blaze import into
into("h5py:///tmp/dummy::/X", da.ones((10**4, 8*10**5), chunks=(10**4,10**4)))
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**4,10**4)))
y = x.dot(x.T)
into("h5py:///tmp/dummy::/Y", y)
我希望这个计算能够顺利进行,因为每个 (10,000*10,000) 块都应该单独转置,然后是点积,然后求和到最终结果。但是,运行 此计算会填满我的 RAM 和交换内存,直到进程最终被终止。
这里是用dot_graph绘制的计算图示例:Computation graph sample
根据调度文档 http://dask.pydata.org/en/latest/scheduling-policy.html
我希望上面的张量点中间结果在单独计算后会被一个一个地汇总到最后的总和结果中。这将释放这些tensordot中间结果的内存,这样我们就不会面临内存错误。
玩小玩具示例:
from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler
# Experiment on a (1,0000 * 5,000) matrix X split into 500 chunks of size (1,000 * 10)
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**3,10)))[:10**3,5000]
y = x.T.dot(x)
with Profiler() as prof, CacheProfiler() as cprof, ResourceProfiler() as rprof:
into("h5py:///tmp/dummy::/X", y)
rprof.visualize()
我得到以下显示:
Ressource profiler
其中绿色条代表求和运算,黄色和紫色条分别代表get_array和tensordot运算。这似乎表明 sum 操作在对它们求和之前等待所有中间 tensordot 操作被执行。这也可以解释我的进程 运行 内存不足并被杀死。
所以我的问题是:
- 这是求和运算的正常行为吗?
- 有没有办法强制它先计算中间金额
计算中间张量点乘积并保存在内存中?
- 如果不是,是否有不涉及溢出到磁盘的解决方法?
非常感谢任何帮助!
一般来说,在小 space 中执行密集矩阵-矩阵乘法很难。这是因为每个中间块都会被几个输出块使用。
According to the sheduling doc that http: //dask.pydata.org/en/latest/scheduling-policy.html I would expect the upper tensordot intermediary results to be summed up one by one into the last sum result as soon as they have been individually computed.
您显示的图形有许多输入到求和函数。 Dask 将等到所有这些输入都完成后再 运行 求和函数。任务调度程序不知道 sum 是关联的,可以 运行 一块一块地。这种语义信息的缺乏是你使用像 Dask 这样的通用任务调度系统而不是专用的线性代数库所付出的代价。如果您的目标是尽可能高效地执行密集线性代数,那么您可能想看看其他地方;这是一个覆盖面广的领域。
因此,正如所写,您的内存要求是 至少 8e5 * 1e4 * dtype.itemsize
,假设 Dask 以完全正确的顺序进行(它应该 大部分做)。
您可以尝试以下方法:
- 沿非收缩维度减小块大小
- 使用 0.14.1 之后的 Dask 版本(0.14.2 应该在 2017 年 5 月 5 日之前发布),我们在图中将那些大额调用明确分解为许多较小的调用。
使用分布式调度程序,它可以更有效地处理将数据写入磁盘。
from dask.distributed import Client
client = Client(processes=False) # create a local cluster in this process
我正在尝试为大小为 10,000 * 800,000 的矩阵 X 计算矩阵乘积 Y=XX^T。矩阵 X 存储在磁盘上的 h5py 文件中。生成的 Y 应该是存储在同一个 h5py 文件中的 10,000*10,000 矩阵。这是一个可重现的示例代码。
import dask.array as da
from blaze import into
into("h5py:///tmp/dummy::/X", da.ones((10**4, 8*10**5), chunks=(10**4,10**4)))
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**4,10**4)))
y = x.dot(x.T)
into("h5py:///tmp/dummy::/Y", y)
我希望这个计算能够顺利进行,因为每个 (10,000*10,000) 块都应该单独转置,然后是点积,然后求和到最终结果。但是,运行 此计算会填满我的 RAM 和交换内存,直到进程最终被终止。
这里是用dot_graph绘制的计算图示例:Computation graph sample
根据调度文档 http://dask.pydata.org/en/latest/scheduling-policy.html 我希望上面的张量点中间结果在单独计算后会被一个一个地汇总到最后的总和结果中。这将释放这些tensordot中间结果的内存,这样我们就不会面临内存错误。
玩小玩具示例:
from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler
# Experiment on a (1,0000 * 5,000) matrix X split into 500 chunks of size (1,000 * 10)
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**3,10)))[:10**3,5000]
y = x.T.dot(x)
with Profiler() as prof, CacheProfiler() as cprof, ResourceProfiler() as rprof:
into("h5py:///tmp/dummy::/X", y)
rprof.visualize()
我得到以下显示: Ressource profiler
其中绿色条代表求和运算,黄色和紫色条分别代表get_array和tensordot运算。这似乎表明 sum 操作在对它们求和之前等待所有中间 tensordot 操作被执行。这也可以解释我的进程 运行 内存不足并被杀死。
所以我的问题是:
- 这是求和运算的正常行为吗?
- 有没有办法强制它先计算中间金额 计算中间张量点乘积并保存在内存中?
- 如果不是,是否有不涉及溢出到磁盘的解决方法?
非常感谢任何帮助!
一般来说,在小 space 中执行密集矩阵-矩阵乘法很难。这是因为每个中间块都会被几个输出块使用。
According to the sheduling doc that http: //dask.pydata.org/en/latest/scheduling-policy.html I would expect the upper tensordot intermediary results to be summed up one by one into the last sum result as soon as they have been individually computed.
您显示的图形有许多输入到求和函数。 Dask 将等到所有这些输入都完成后再 运行 求和函数。任务调度程序不知道 sum 是关联的,可以 运行 一块一块地。这种语义信息的缺乏是你使用像 Dask 这样的通用任务调度系统而不是专用的线性代数库所付出的代价。如果您的目标是尽可能高效地执行密集线性代数,那么您可能想看看其他地方;这是一个覆盖面广的领域。
因此,正如所写,您的内存要求是 至少 8e5 * 1e4 * dtype.itemsize
,假设 Dask 以完全正确的顺序进行(它应该 大部分做)。
您可以尝试以下方法:
- 沿非收缩维度减小块大小
- 使用 0.14.1 之后的 Dask 版本(0.14.2 应该在 2017 年 5 月 5 日之前发布),我们在图中将那些大额调用明确分解为许多较小的调用。
使用分布式调度程序,它可以更有效地处理将数据写入磁盘。
from dask.distributed import Client client = Client(processes=False) # create a local cluster in this process