pykalman:(默认)处理缺失值

pykalman: (default) handling of missing values

我正在使用 pykalman 模块中的 KalmanFilter,想知道它如何处理缺失的观测值。根据文档:

In real world systems, it is common to have sensors occasionally fail. The Kalman Filter, Kalman Smoother, and EM algorithm are all equipped to handle this scenario. To make use of it, one only need apply a NumPy mask to the measurement at the missing time step:

from numpy import ma X = ma.array([1,2,3]) X1 = ma.masked # hide measurement at time step 1 kf.em(X).smooth(X)

我们可以平滑输入时间序列。由于这是一个 "additional" 函数,我认为它不会自动完成;那么在变量中包含 NaN 时默认方法是什么?

这里解释了可能发生的理论方法;这也是 pykalman 所做的吗(我认为这真的很棒):

Cross Validated - How to handle incomplete data in Kalman Filter?

我们来看看源码:

filter_update函数中pykalman检查当前观察是否被屏蔽。

def filter_update(...)

        # Make a masked observation if necessary
        if observation is None:
            n_dim_obs = observation_covariance.shape[0]
            observation = np.ma.array(np.zeros(n_dim_obs))
            observation.mask = True
        else:
            observation = np.ma.asarray(observation) 

它不影响预测步骤。但是校正步骤有两种选择。它发生在 _filter_correct 函数中。

def _filter_correct(...)

    if not np.any(np.ma.getmask(observation)):

         # the normal Kalman Filter math

    else:
        n_dim_state = predicted_state_covariance.shape[0]
        n_dim_obs = observation_matrix.shape[0]
        kalman_gain = np.zeros((n_dim_state, n_dim_obs))

        # !!!! the corrected state takes the result of the prediction !!!!

        corrected_state_mean = predicted_state_mean
        corrected_state_covariance = predicted_state_covariance

正如您所见,这正是理论方法。

这里有一个简短的例子和可以使用的工作数据。

假设你有一个 GPS 接收器并且你想在走路时跟踪自己。接收器具有良好的精度。为简化起见,假设您只向前走。

没有什么有趣的事情发生。由于 gps 信号良好,过滤器可以很好地估计您的位置。如果一段时间没有信号怎么办?

滤波器只能根据现有状态和系统动力学知识进行预测(参见矩阵 Q)。随着每一个预测步骤,不确定性都会增加。估计位置周围的 1-Sigma 范围变大。一旦再次出现新的观察,状态就会得到纠正。

这是代码和 data:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt
from numpy import ma

# enable or disable missing observations
use_mask = 1

# reading data (quick and dirty)
Time=[]
X=[]

for line in open('data/dataset_01.csv'):
    f1, f2  = line.split(';')
    Time.append(float(f1))
    X.append(float(f2))

if (use_mask):
    X = ma.asarray(X)
    X[300:500] = ma.masked

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  dt,   0.5*dt*dt], 
     [0,   1,          dt],
     [0,   0,           1]]  

# observation_matrix   
H = [1, 0, 0]

# transition_covariance 
Q = [[   1,     0,     0], 
     [   0,  1e-4,     0],
     [   0,     0,  1e-6]] 

# observation_covariance 
R = [0.04] # max error = 0.6m

# initial_state_mean
X0 = [0,
      0,
      0]

# initial_state_covariance
P0 = [[ 10,    0,   0], 
      [  0,    1,   0],
      [  0,    0,   1]]

n_timesteps = len(Time)
n_dim_state = 3

filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R, 
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0
    else:
        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = X[t])
        )

position_sigma = np.sqrt(filtered_state_covariances[:, 0, 0]);        

# plot of the resulted trajectory        
plt.plot(Time, filtered_state_means[:, 0], "g-", label="Filtered position", markersize=1)
plt.plot(Time, filtered_state_means[:, 0] + position_sigma, "r--", label="+ sigma", markersize=1)
plt.plot(Time, filtered_state_means[:, 0] - position_sigma, "r--", label="- sigma", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.xlabel("Time (s)")
plt.ylabel("Position (m)")
plt.show()      

更新

如果屏蔽更长的周期 (300:700),看起来会更有趣。

如你所见,位置又回来了。这是因为转换矩阵 F 绑定了位置、速度和加速度的预测。

如果您查看速度状态,它会解释下降位置。

在时间点 300 s 加速度停止。速度以恒定斜率下降并穿过 0 值。在这个时间点之后位置必须返回。

要绘制速度,请使用以下代码:

velocity_sigma = np.sqrt(filtered_state_covariances[:, 1, 1]);     

# plot of the estimated velocity        
plt.plot(Time, filtered_state_means[:, 1], "g-", label="Filtered velocity", markersize=1)
plt.plot(Time, filtered_state_means[:, 1] + velocity_sigma, "r--", label="+ sigma", markersize=1)
plt.plot(Time, filtered_state_means[:, 1] - velocity_sigma, "r--", label="- sigma", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.xlabel("Time (s)")
plt.ylabel("Velocity (m/s)")
plt.show()