Dask - Rechunk 或数组切片导致大量内存使用?

Dask - Rechunk or array slicing causing large memory usage?

下午好,

我正在寻求一些帮助,以了解我的 Dask 处理链中一些过度(或可能没有)的内存使用。

问题出在以下函数的执行上:

def create_fft_arrays(master_array, fft_size, overlap):

    input_shape = master_array.shape[0]
    # Determine zero pad length
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))

    zeros = da.zeros((zero_len, master_array.shape[1]),
                     dtype = master_array.dtype,
                     chunks = (zero_len, master_array.shape[1]))
    # Create the reshaped array
    reshape_array = da.concatenate((master_array, zeros), axis = 0)
    # Create an index series to use to index the reshaped array for re-blocking.
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
    # Break reshape_array into fft size chunks
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]

    # Returns list of dask arrays
    return [array.rechunk(array.shape) for array in fft_arrays]

其中 master_arrayDask Array 太大而无法保存在内存中(本例中为 703、57600001 点)。

作为一个最小的例子,下面的代码会导致与下面的完整代码相同的内存使用

import dask.array as da
import numpy as np

def create_fft_arrays(master_array, fft_size, overlap):

    input_shape = master_array.shape[0]
    # Determine zero pad length
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))

    zeros = da.zeros((zero_len, master_array.shape[1]),
                     dtype = master_array.dtype,
                     chunks = (zero_len, master_array.shape[1]))
    # Create the reshaped array
    reshape_array = da.concatenate((master_array, zeros), axis = 0)
    # Create an index series to use to index the reshaped array for re-blocking.
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
    # Break reshape_array into fft size chunks
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]

    # Returns list of dask arrays
    return [array.rechunk(array.shape) for array in fft_arrays]

# Fabricate an input array of the same shape and size as the problematic dataset
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))

# Execute the create_fft_arrays function
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5)

为了将代码放在上下文中,执行以下代码会导致我的 RAM (20Gb) 在执行最后一行时达到最大值 fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5):

import dask.array as da

import h5py as h5
import numpy as np

import os

FORMAT = '.h5'
DSET_PATH = '/DAS/Data'
TSET_PATH = '/DAS/Time'

FFT_SIZE = 2**15
OVERLAP = 0.5

input_dir = r'D:\'
file_paths = []

# Get list of all valid files in directory
for dir_name, sub_dir, f_name in os.walk(input_dir):
    for f in f_name:
        if f[-1*len(FORMAT):] == FORMAT:
            file_paths.append(os.path.join(dir_name, f))

#H5 object for each file
file_handles = [h5.File(f_path, 'r') for f_path in file_paths]

# Handle for dataset and timestamps from each file
dset_handles = [f[DSET_PATH] for f in file_handles]
tset_handles = [f[TSET_PATH] for f in file_handles]

# Create a Dask Array object for each dataset and timestamp set
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles]
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles]

# Concatenate all datasets along along the time axis
master_array = da.concatenate(dset_arrays, axis = 1)

def create_fft_arrays(master_array, fft_size, overlap):

    input_shape = master_array.shape[0]
    # Determine zero pad length
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))

    zeros = da.zeros((zero_len, master_array.shape[1]),
                     dtype = master_array.dtype,
                     chunks = (zero_len, master_array.shape[1]))
    # Create the reshaped array
    reshape_array = da.concatenate((master_array, zeros), axis = 0)
    # Create an index series to use to index the reshaped array for re-blocking.
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
    # Break reshape_array into fft size chunks
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]

    # Returns list of dask arrays
    return [array.rechunk(array.shape) for array in fft_arrays]

# Break master_array into FFT sized arrays with a single chunk in each
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)

在此之后,我将继续使用 da.fft.fft 方法来计算每个 fft 阵列的频率响应。

如有任何帮助或建议,我们将不胜感激,

乔治

你的主数组有很多块

>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
>>> master_array.npartitions
154839

每个块都有一些管理开销,因此最好将数量保持在比这个小一些的程度。这个这个section on chunks from the dask.array documentation

当您尝试对该数组进行切片数千次时,就会出现瓶颈。

您的问题可能会通过增加块大小得到一定程度的解决。上面链接的文档提供了一些建议。