并行和延迟使用 Dask(和/或 xarray)的短时傅里叶变换(频谱分析)

Short Time Fourier Transform (spectrum analysis) in parallel & lazily using Dask (and / or xarray)

问题:

我正在尝试对长时间序列数据进行频谱分析(请参阅数据结构示例,它基本上是具有时间索引的一维数据)。为了节省时间和内存等。我想并行和懒惰地执行此操作(使用 xarray 和/或 dask)。
最好(或更好)的方法是什么?

我试过的:

(示例和代码见下文)

  1. 使用 scipy.signal.stft 和 xr.apply_ufunc:
    问题: ValueError,仅在输入数据为 1 个块时有效,不适用于大数据。

  2. 使用 scipy.signal.stft 与 dask.array.from_delayed:
    问题:输出数据总是 1 个块,这使得进一步处理数据变得困难。 (之后重新分块会使 RAM 过载)

  3. 使用 xr.Dataset.rolling.construct 的中级(惰性)二维变换。这里 1 维是时间,行是短时间 windows,我在其上执行 fft(“滚动 window”)。

    使用数据:[1, 2, 3, 4, 5] 和 3 的滚动 window 这将变为:

    时间索引 滚动window
    00:00:00 NaN, NaN, 1
    00:00:01 南, 1, 2
    00:00:02 1, 2, 3
    00:00:03 2、3、4
    00:00:04 3、4、5

    然后使用 xr.apply ufunc 来计算这个新数组上每个时间点行(矢量化)的 fft,也是懒惰的(具体请参见下面的示例)。 这是可行的,因为它是延迟计算的,所以它也适用于大数据集。

    问题:

    • 似乎比 scipy.signal.stft 慢(参见下面的基准测试)
    • overlap/stepsize 无法更改(与 scipy stft 中的 noverlap 一样)。
    • 是否真的需要中间步骤或者是否有立即计算滚动的方法window fft?
    • 在某些情况下仍然会使 RAM 过载

其他:我也尝试了其他方法,例如用numba优化函数(但在np.fft.fft上效果不佳)但是为了保持这个已经很长post 简而言之,我只包含了我尝试过的最有希望的方法。

我该怎么做才能做到这一点,和/或提高方法 3 的性能或使用方法 1 或 2,或者使用 xarray / Dask 进行延迟和并行处理的其他方法?

谢谢!

代码/示例:

导入和测试数据集:

import xarray as xr
import dask.array as da
import dask
import numpy as np
from scipy import signal as ss

# variables
fs= 128           # rec frequency
_l = int(1e3)     # data length
window = fs * 10  # window over which to do stft


# Create data 
data = np.random.random(_l * fs)
data = xr.DataArray(da.from_array(data, name="x")).assign_coords({"dim_0":np.arange(_l * fs)})
data = data.chunk(chunks=_l/10 * fs)   # create multiple chunks to simulate bigger data
data = data.to_dataset()

方法一:

def stft(data,):
    f, t, Zxx = ss.stft(data,
                        fs=fs, 
                        nperseg=window,
                        noverlap= window - 1
                        )    
    return np.abs(Zxx)

data['fft'] = xr.apply_ufunc(stft,
               data.x,
              input_core_dims=[['dim_0']],
              output_core_dims=[["f", "t"]],   # define newly created output dims
              dask="parallelized",
              output_dtypes=['f8'],
              output_sizes={"f": window / 2 + 1, 
                            "t": data.x.size - 1},
              )

输出:

ValueError: dimension 'dim_0' on 0th function argument to apply_ufunc with dask='parallelized' consists of multiple chunks, but is also a core dimension. To fix, rechunk into a single dask array chunk along this dimension, i.e., ``.chunk({'dim_0': -1})``, but beware that this may significantly increase memory usage.

方法二:

def stft1(x):
    f, t, Zxx = ss.stft(x,
                        fs=fs, 
                        nperseg=window,
                        noverlap=window - 1
                        )    
    return np.abs(Zxx)
    
da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8")

输出:

方法三:(可行但不是最优?)

def xr_make_fft(ds, par):
    ds['roll_window'] = np.arange(window)  # create dimension for rolling window
    
    # FFT function to apply vectorized:
    def xr_fft(x):
        fft = np.fft.fft(x)[xr_fft.idx]
        return np.abs(fft)
    
    # make new parameter with rolling windows stacked
    ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",)
    
    # Calc FFT Freq domain
    fftfreq = np.fft.fftfreq(window, 1 / fs)
    idx = (0 < fftfreq)
    freq = fftfreq[idx]
    ds['Frequency'] = freq
    xr_fft.idx = idx
    
    ds[f'FFT'] = xr.apply_ufunc(xr_fft,
                                ds[f"FFT_window"],
                                vectorize=True,
                                input_core_dims=[["roll_window"]],  # define input dim over which to vectorize (this dim in inserted completely)
                                output_core_dims=[["Frequency"]],   # define newly created output dims
                                dask="parallelized",
                                output_dtypes=['f8'],
                                output_sizes={"Frequency": len(freq)},
                                )
    
    ds = ds.drop(f'FFT_window').drop_dims('roll_window')
    return ds

data = xr_make_fft(data, "x")

输出:

小型数据集的性能测试: 128e3 长度数据(未超载 RAM 的最大数据集):

方法二:

%timeit da.from_delayed(dask.delayed(stft1)(data.x), shape=(window / 2 + 1, data.x.size - 1), meta="f8").compute()
3.53 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

方法三:

%timeit data.FFT.compute() 
6.04 s ± 219 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.signal.stft

%timeit ss.stft(data.x, fs, nperseg=window, noverlap=window-1)[2]
1.87 s ± 8.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

经过测试多种方法,这似乎是最好的解决方案:

创建示例数据和导入:

import dask.array as da
import dask
import numpy as np

fs = 128                            # sample rate
_l = int(5e6)                       # n samples, change if you like

data = da.random.random(_l * fs)    # really long array with random numbers

解决方案(使用 dask 数组):

此解决方案使用 sliding_window_view 在数据上创建滑动 window 的二维变换,然后在该二维 window 上应用 fft(如 proposed/explained问题)。

如果需要,您始终可以将输出 dask 数组添加到 xarray 数据集。您还可以将此方法应用于嵌入 xarray 数据集中的 dask 数组。

第 1 步:
将数据分块,以便通过 2d 转换它具有大小合适的块(稍后重新分块是昂贵的/导致崩溃)。要测试哪种块大小最有效,请在小样本上进行测试并检查 fft_window(参见步骤 3)变量的块大小。

data = data.rechunk(chunks=24000)  # 24000 works great on my 16 GB system adjust if needed

2d window with 重新分块:

2d window 没有 重新分块:

第 2 步:

计算 fft 参数并创建 fft 函数

FFT_WINDOW = 10 * fs                             # use a 10 second window
fftfreq = np.fft.fftfreq(FFT_WINDOW, 1 / fs)     # create fft freq array
idx = (0 < fftfreq)                              # create index of positive freq
freq = fftfreq[idx]                              # create freq  array

# create fft function:
da_fft = lambda x: da.absolute(da.fft.fft(x, axis=1)[:, idx]

第 3 步: 进行二维变换并计算 fft

with dask.config.set(**{'array.slicing.split_large_chunks': False}):          # surpress warning
    fft_window = da.overlap.sliding_window_view(data, FFT_WINDOW)[:, ::-1]   # [:, ::-1] is to backward fill window
    fft = da_fft(fft_window)
    PAD_LEN = data.size - fft.shape[0]
    fft = da.pad(fft, ((PAD_LEN, 0), (0, 0)), constant_values=np.nan)         # pad to match original array

注意:如果 dask 版本 < 2021.03.1,请改用 numpy.lib.stride_tricks.sliding_window_view 和 da.map_overlap。性能相当

输出:

性能(超过100000点计算平均值):

  • 问题中的方法 3(从此处添加步骤 1 以防止内存过载):
    5.36 s ± 72 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

  • 这里提出的方法:
    1.83 s ± 52.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)