并行和延迟使用 Dask(和/或 xarray)的短时傅里叶变换(频谱分析)
Short Time Fourier Transform (spectrum analysis) in parallel & lazily using Dask (and / or xarray)
问题:
我正在尝试对长时间序列数据进行频谱分析(请参阅数据结构示例,它基本上是具有时间索引的一维数据)。为了节省时间和内存等。我想并行和懒惰地执行此操作(使用 xarray 和/或 dask)。
最好(或更好)的方法是什么?
我试过的:
(示例和代码见下文)
使用 scipy.signal.stft 和 xr.apply_ufunc:
问题: ValueError,仅在输入数据为 1 个块时有效,不适用于大数据。
使用 scipy.signal.stft 与 dask.array.from_delayed:
问题:输出数据总是 1 个块,这使得进一步处理数据变得困难。 (之后重新分块会使 RAM 过载)
使用 xr.Dataset.rolling.construct 的中级(惰性)二维变换。这里 1 维是时间,行是短时间 windows,我在其上执行 fft(“滚动 window”)。
使用数据:[1, 2, 3, 4, 5] 和 3 的滚动 window 这将变为:
时间索引
滚动window
00:00:00
NaN, NaN, 1
00:00:01
南, 1, 2
00:00:02
1, 2, 3
00:00:03
2、3、4
00:00:04
3、4、5
然后使用 xr.apply ufunc 来计算这个新数组上每个时间点行(矢量化)的 fft,也是懒惰的(具体请参见下面的示例)。
这是可行的,因为它是延迟计算的,所以它也适用于大数据集。
问题:
- 似乎比 scipy.signal.stft 慢(参见下面的基准测试)
- overlap/stepsize 无法更改(与 scipy stft 中的 noverlap 一样)。
- 是否真的需要中间步骤或者是否有立即计算滚动的方法window fft?
- 在某些情况下仍然会使 RAM 过载
其他:我也尝试了其他方法,例如用numba优化函数(但在np.fft.fft上效果不佳)但是为了保持这个已经很长post 简而言之,我只包含了我尝试过的最有希望的方法。
我该怎么做才能做到这一点,和/或提高方法 3 的性能或使用方法 1 或 2,或者使用 xarray / Dask 进行延迟和并行处理的其他方法?
谢谢!
代码/示例:
导入和测试数据集:
import xarray as xr
import dask.array as da
import dask
import numpy as np
from scipy import signal as ss
# variables
fs= 128 # rec frequency
_l = int(1e3) # data length
window = fs * 10 # window over which to do stft
# Create data
data = np.random.random(_l * fs)
data = xr.DataArray(da.from_array(data, name="x")).assign_coords({"dim_0":np.arange(_l * fs)})
data = data.chunk(chunks=_l/10 * fs) # create multiple chunks to simulate bigger data
data = data.to_dataset()
方法一:
def stft(data,):
f, t, Zxx = ss.stft(data,
fs=fs,
nperseg=window,
noverlap= window - 1
)
return np.abs(Zxx)
data['fft'] = xr.apply_ufunc(stft,
data.x,
input_core_dims=[['dim_0']],
output_core_dims=[["f", "t"]], # define newly created output dims
dask="parallelized",
output_dtypes=['f8'],
output_sizes={"f": window / 2 + 1,
"t": data.x.size - 1},
)
输出:
ValueError: dimension 'dim_0' on 0th function argument to apply_ufunc with dask='parallelized' consists of multiple chunks, but is also a core dimension. To fix, rechunk into a single dask array chunk along this dimension, i.e., ``.chunk({'dim_0': -1})``, but beware that this may significantly increase memory usage.
方法二:
def stft1(x):
f, t, Zxx = ss.stft(x,
fs=fs,
nperseg=window,
noverlap=window - 1
)
return np.abs(Zxx)
da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8")
输出:
方法三:(可行但不是最优?)
def xr_make_fft(ds, par):
ds['roll_window'] = np.arange(window) # create dimension for rolling window
# FFT function to apply vectorized:
def xr_fft(x):
fft = np.fft.fft(x)[xr_fft.idx]
return np.abs(fft)
# make new parameter with rolling windows stacked
ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",)
# Calc FFT Freq domain
fftfreq = np.fft.fftfreq(window, 1 / fs)
idx = (0 < fftfreq)
freq = fftfreq[idx]
ds['Frequency'] = freq
xr_fft.idx = idx
ds[f'FFT'] = xr.apply_ufunc(xr_fft,
ds[f"FFT_window"],
vectorize=True,
input_core_dims=[["roll_window"]], # define input dim over which to vectorize (this dim in inserted completely)
output_core_dims=[["Frequency"]], # define newly created output dims
dask="parallelized",
output_dtypes=['f8'],
output_sizes={"Frequency": len(freq)},
)
ds = ds.drop(f'FFT_window').drop_dims('roll_window')
return ds
data = xr_make_fft(data, "x")
输出:
小型数据集的性能测试:
128e3 长度数据(未超载 RAM 的最大数据集):
方法二:
%timeit da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8").compute()
3.53 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
方法三:
%timeit data.FFT.compute()
6.04 s ± 219 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.signal.stft
%timeit ss.stft(data.x, fs, nperseg=window, noverlap=window-1)[2]
1.87 s ± 8.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
经过测试多种方法,这似乎是最好的解决方案:
创建示例数据和导入:
import dask.array as da
import dask
import numpy as np
fs = 128 # sample rate
_l = int(5e6) # n samples, change if you like
data = da.random.random(_l * fs) # really long array with random numbers
解决方案(使用 dask 数组):
此解决方案使用 sliding_window_view 在数据上创建滑动 window 的二维变换,然后在该二维 window 上应用 fft(如 proposed/explained问题)。
如果需要,您始终可以将输出 dask 数组添加到 xarray 数据集。您还可以将此方法应用于嵌入 xarray 数据集中的 dask 数组。
第 1 步:
将数据分块,以便通过 2d 转换它具有大小合适的块(稍后重新分块是昂贵的/导致崩溃)。要测试哪种块大小最有效,请在小样本上进行测试并检查 fft_window(参见步骤 3)变量的块大小。
data = data.rechunk(chunks=24000) # 24000 works great on my 16 GB system adjust if needed
2d window with 重新分块:
2d window 没有 重新分块:
第 2 步:
计算 fft 参数并创建 fft 函数
FFT_WINDOW = 10 * fs # use a 10 second window
fftfreq = np.fft.fftfreq(FFT_WINDOW, 1 / fs) # create fft freq array
idx = (0 < fftfreq) # create index of positive freq
freq = fftfreq[idx] # create freq array
# create fft function:
da_fft = lambda x: da.absolute(da.fft.fft(x, axis=1)[:, idx]
第 3 步:
进行二维变换并计算 fft
with dask.config.set(**{'array.slicing.split_large_chunks': False}): # surpress warning
fft_window = da.overlap.sliding_window_view(data, FFT_WINDOW)[:, ::-1] # [:, ::-1] is to backward fill window
fft = da_fft(fft_window)
PAD_LEN = data.size - fft.shape[0]
fft = da.pad(fft, ((PAD_LEN, 0), (0, 0)), constant_values=np.nan) # pad to match original array
注意:如果 dask 版本 < 2021.03.1,请改用 numpy.lib.stride_tricks.sliding_window_view 和 da.map_overlap。性能相当
输出:
性能(超过100000点计算平均值):
问题中的方法 3(从此处添加步骤 1 以防止内存过载):
5.36 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
这里提出的方法:
1.83 s ± 52.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
问题:
我正在尝试对长时间序列数据进行频谱分析(请参阅数据结构示例,它基本上是具有时间索引的一维数据)。为了节省时间和内存等。我想并行和懒惰地执行此操作(使用 xarray 和/或 dask)。
最好(或更好)的方法是什么?
我试过的:
(示例和代码见下文)
使用 scipy.signal.stft 和 xr.apply_ufunc:
问题: ValueError,仅在输入数据为 1 个块时有效,不适用于大数据。使用 scipy.signal.stft 与 dask.array.from_delayed:
问题:输出数据总是 1 个块,这使得进一步处理数据变得困难。 (之后重新分块会使 RAM 过载)使用 xr.Dataset.rolling.construct 的中级(惰性)二维变换。这里 1 维是时间,行是短时间 windows,我在其上执行 fft(“滚动 window”)。
使用数据:[1, 2, 3, 4, 5] 和 3 的滚动 window 这将变为:
时间索引 滚动window 00:00:00 NaN, NaN, 1 00:00:01 南, 1, 2 00:00:02 1, 2, 3 00:00:03 2、3、4 00:00:04 3、4、5 然后使用 xr.apply ufunc 来计算这个新数组上每个时间点行(矢量化)的 fft,也是懒惰的(具体请参见下面的示例)。 这是可行的,因为它是延迟计算的,所以它也适用于大数据集。
问题:
- 似乎比 scipy.signal.stft 慢(参见下面的基准测试)
- overlap/stepsize 无法更改(与 scipy stft 中的 noverlap 一样)。
- 是否真的需要中间步骤或者是否有立即计算滚动的方法window fft?
- 在某些情况下仍然会使 RAM 过载
其他:我也尝试了其他方法,例如用numba优化函数(但在np.fft.fft上效果不佳)但是为了保持这个已经很长post 简而言之,我只包含了我尝试过的最有希望的方法。
我该怎么做才能做到这一点,和/或提高方法 3 的性能或使用方法 1 或 2,或者使用 xarray / Dask 进行延迟和并行处理的其他方法?
谢谢!
代码/示例:
导入和测试数据集:
import xarray as xr
import dask.array as da
import dask
import numpy as np
from scipy import signal as ss
# variables
fs= 128 # rec frequency
_l = int(1e3) # data length
window = fs * 10 # window over which to do stft
# Create data
data = np.random.random(_l * fs)
data = xr.DataArray(da.from_array(data, name="x")).assign_coords({"dim_0":np.arange(_l * fs)})
data = data.chunk(chunks=_l/10 * fs) # create multiple chunks to simulate bigger data
data = data.to_dataset()
方法一:
def stft(data,):
f, t, Zxx = ss.stft(data,
fs=fs,
nperseg=window,
noverlap= window - 1
)
return np.abs(Zxx)
data['fft'] = xr.apply_ufunc(stft,
data.x,
input_core_dims=[['dim_0']],
output_core_dims=[["f", "t"]], # define newly created output dims
dask="parallelized",
output_dtypes=['f8'],
output_sizes={"f": window / 2 + 1,
"t": data.x.size - 1},
)
输出:
ValueError: dimension 'dim_0' on 0th function argument to apply_ufunc with dask='parallelized' consists of multiple chunks, but is also a core dimension. To fix, rechunk into a single dask array chunk along this dimension, i.e., ``.chunk({'dim_0': -1})``, but beware that this may significantly increase memory usage.
方法二:
def stft1(x):
f, t, Zxx = ss.stft(x,
fs=fs,
nperseg=window,
noverlap=window - 1
)
return np.abs(Zxx)
da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8")
输出:
方法三:(可行但不是最优?)
def xr_make_fft(ds, par):
ds['roll_window'] = np.arange(window) # create dimension for rolling window
# FFT function to apply vectorized:
def xr_fft(x):
fft = np.fft.fft(x)[xr_fft.idx]
return np.abs(fft)
# make new parameter with rolling windows stacked
ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",)
# Calc FFT Freq domain
fftfreq = np.fft.fftfreq(window, 1 / fs)
idx = (0 < fftfreq)
freq = fftfreq[idx]
ds['Frequency'] = freq
xr_fft.idx = idx
ds[f'FFT'] = xr.apply_ufunc(xr_fft,
ds[f"FFT_window"],
vectorize=True,
input_core_dims=[["roll_window"]], # define input dim over which to vectorize (this dim in inserted completely)
output_core_dims=[["Frequency"]], # define newly created output dims
dask="parallelized",
output_dtypes=['f8'],
output_sizes={"Frequency": len(freq)},
)
ds = ds.drop(f'FFT_window').drop_dims('roll_window')
return ds
data = xr_make_fft(data, "x")
输出:
小型数据集的性能测试: 128e3 长度数据(未超载 RAM 的最大数据集):
方法二:
%timeit da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8").compute()
3.53 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
方法三:
%timeit data.FFT.compute()
6.04 s ± 219 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.signal.stft
%timeit ss.stft(data.x, fs, nperseg=window, noverlap=window-1)[2]
1.87 s ± 8.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
经过测试多种方法,这似乎是最好的解决方案:
创建示例数据和导入:
import dask.array as da
import dask
import numpy as np
fs = 128 # sample rate
_l = int(5e6) # n samples, change if you like
data = da.random.random(_l * fs) # really long array with random numbers
解决方案(使用 dask 数组):
此解决方案使用 sliding_window_view 在数据上创建滑动 window 的二维变换,然后在该二维 window 上应用 fft(如 proposed/explained问题)。
如果需要,您始终可以将输出 dask 数组添加到 xarray 数据集。您还可以将此方法应用于嵌入 xarray 数据集中的 dask 数组。
第 1 步:
将数据分块,以便通过 2d 转换它具有大小合适的块(稍后重新分块是昂贵的/导致崩溃)。要测试哪种块大小最有效,请在小样本上进行测试并检查 fft_window(参见步骤 3)变量的块大小。
data = data.rechunk(chunks=24000) # 24000 works great on my 16 GB system adjust if needed
2d window with 重新分块:
2d window 没有 重新分块:
第 2 步:
计算 fft 参数并创建 fft 函数
FFT_WINDOW = 10 * fs # use a 10 second window
fftfreq = np.fft.fftfreq(FFT_WINDOW, 1 / fs) # create fft freq array
idx = (0 < fftfreq) # create index of positive freq
freq = fftfreq[idx] # create freq array
# create fft function:
da_fft = lambda x: da.absolute(da.fft.fft(x, axis=1)[:, idx]
第 3 步: 进行二维变换并计算 fft
with dask.config.set(**{'array.slicing.split_large_chunks': False}): # surpress warning
fft_window = da.overlap.sliding_window_view(data, FFT_WINDOW)[:, ::-1] # [:, ::-1] is to backward fill window
fft = da_fft(fft_window)
PAD_LEN = data.size - fft.shape[0]
fft = da.pad(fft, ((PAD_LEN, 0), (0, 0)), constant_values=np.nan) # pad to match original array
注意:如果 dask 版本 < 2021.03.1,请改用 numpy.lib.stride_tricks.sliding_window_view 和 da.map_overlap。性能相当
输出:
性能(超过100000点计算平均值):
问题中的方法 3(从此处添加步骤 1 以防止内存过载):
5.36 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
这里提出的方法:
1.83 s ± 52.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)