Dask - Rechunk 或数组切片导致大量内存使用?
Dask - Rechunk or array slicing causing large memory usage?
下午好,
我正在寻求一些帮助,以了解我的 Dask 处理链中一些过度(或可能没有)的内存使用。
问题出在以下函数的执行上:
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
其中 master_array
是 Dask Array
太大而无法保存在内存中(本例中为 703、57600001 点)。
作为一个最小的例子,下面的代码会导致与下面的完整代码相同的内存使用
import dask.array as da
import numpy as np
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Fabricate an input array of the same shape and size as the problematic dataset
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
# Execute the create_fft_arrays function
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5)
为了将代码放在上下文中,执行以下代码会导致我的 RAM (20Gb) 在执行最后一行时达到最大值 fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
:
import dask.array as da
import h5py as h5
import numpy as np
import os
FORMAT = '.h5'
DSET_PATH = '/DAS/Data'
TSET_PATH = '/DAS/Time'
FFT_SIZE = 2**15
OVERLAP = 0.5
input_dir = r'D:\'
file_paths = []
# Get list of all valid files in directory
for dir_name, sub_dir, f_name in os.walk(input_dir):
for f in f_name:
if f[-1*len(FORMAT):] == FORMAT:
file_paths.append(os.path.join(dir_name, f))
#H5 object for each file
file_handles = [h5.File(f_path, 'r') for f_path in file_paths]
# Handle for dataset and timestamps from each file
dset_handles = [f[DSET_PATH] for f in file_handles]
tset_handles = [f[TSET_PATH] for f in file_handles]
# Create a Dask Array object for each dataset and timestamp set
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles]
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles]
# Concatenate all datasets along along the time axis
master_array = da.concatenate(dset_arrays, axis = 1)
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Break master_array into FFT sized arrays with a single chunk in each
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
在此之后,我将继续使用 da.fft.fft
方法来计算每个 fft 阵列的频率响应。
如有任何帮助或建议,我们将不胜感激,
乔治
你的主数组有很多块
>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
>>> master_array.npartitions
154839
每个块都有一些管理开销,因此最好将数量保持在比这个小一些的程度。这个这个section on chunks from the dask.array documentation
当您尝试对该数组进行切片数千次时,就会出现瓶颈。
您的问题可能会通过增加块大小得到一定程度的解决。上面链接的文档提供了一些建议。
下午好,
我正在寻求一些帮助,以了解我的 Dask 处理链中一些过度(或可能没有)的内存使用。
问题出在以下函数的执行上:
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
其中 master_array
是 Dask Array
太大而无法保存在内存中(本例中为 703、57600001 点)。
作为一个最小的例子,下面的代码会导致与下面的完整代码相同的内存使用
import dask.array as da
import numpy as np
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Fabricate an input array of the same shape and size as the problematic dataset
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
# Execute the create_fft_arrays function
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5)
为了将代码放在上下文中,执行以下代码会导致我的 RAM (20Gb) 在执行最后一行时达到最大值 fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
:
import dask.array as da
import h5py as h5
import numpy as np
import os
FORMAT = '.h5'
DSET_PATH = '/DAS/Data'
TSET_PATH = '/DAS/Time'
FFT_SIZE = 2**15
OVERLAP = 0.5
input_dir = r'D:\'
file_paths = []
# Get list of all valid files in directory
for dir_name, sub_dir, f_name in os.walk(input_dir):
for f in f_name:
if f[-1*len(FORMAT):] == FORMAT:
file_paths.append(os.path.join(dir_name, f))
#H5 object for each file
file_handles = [h5.File(f_path, 'r') for f_path in file_paths]
# Handle for dataset and timestamps from each file
dset_handles = [f[DSET_PATH] for f in file_handles]
tset_handles = [f[TSET_PATH] for f in file_handles]
# Create a Dask Array object for each dataset and timestamp set
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles]
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles]
# Concatenate all datasets along along the time axis
master_array = da.concatenate(dset_arrays, axis = 1)
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Break master_array into FFT sized arrays with a single chunk in each
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
在此之后,我将继续使用 da.fft.fft
方法来计算每个 fft 阵列的频率响应。
如有任何帮助或建议,我们将不胜感激,
乔治
你的主数组有很多块
>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
>>> master_array.npartitions
154839
每个块都有一些管理开销,因此最好将数量保持在比这个小一些的程度。这个这个section on chunks from the dask.array documentation
当您尝试对该数组进行切片数千次时,就会出现瓶颈。
您的问题可能会通过增加块大小得到一定程度的解决。上面链接的文档提供了一些建议。