具有不同时间步长的卡尔曼滤波器

Kalman filter with varying timesteps

我有一些数据表示从两个不同的传感器测量到的物体的位置。所以,我需要做传感器融合。更困难的问题是来自每个传感器的数据基本上是随机到达的。我想使用 pykalman 来融合和平滑数据。 pykalman 如何处理变量时间戳数据?

数据的简化样本如下所示:

import pandas as pd
data={'time':\
['10:00:00.0','10:00:01.0','10:00:05.2','10:00:07.5','10:00:07.5','10:00:12.0','10:00:12.5']\
,'X':[10,10.1,20.2,25.0,25.1,35.1,35.0],'Y':[20,20.2,41,45,47,75.0,77.2],\
'Sensor':[1,2,1,1,2,1,2]}

df=pd.DataFrame(data,columns=['time','X','Y','Sensor'])
df.time=pd.to_datetime(df.time)
df=df.set_index('time')

还有这个:

df
Out[130]: 
                            X     Y  Sensor
time                                       
2017-12-01 10:00:00.000  10.0  20.0       1
2017-12-01 10:00:01.000  10.1  20.2       2
2017-12-01 10:00:05.200  20.2  41.0       1
2017-12-01 10:00:07.500  25.0  45.0       1
2017-12-01 10:00:07.500  25.1  47.0       2
2017-12-01 10:00:12.000  35.1  75.0       1
2017-12-01 10:00:12.500  35.0  77.2       2

对于传感器融合问题,我认为我可以重塑数据,使位置 X1、Y1、X2、Y2 具有一堆缺失值,而不仅仅是 X、Y。 (这是相关的:

那么我的数据可以是这样的:

df['X1']=df.X[df.Sensor==1]
df['Y1']=df.Y[df.Sensor==1]
df['X2']=df.X[df.Sensor==2]
df['Y2']=df.Y[df.Sensor==2]
df
Out[132]: 
                            X     Y  Sensor    X1    Y1    X2    Y2
time                                                               
2017-12-01 10:00:00.000  10.0  20.0       1  10.0  20.0   NaN   NaN
2017-12-01 10:00:01.000  10.1  20.2       2   NaN   NaN  10.1  20.2
2017-12-01 10:00:05.200  20.2  41.0       1  20.2  41.0   NaN   NaN
2017-12-01 10:00:07.500  25.0  45.0       1  25.0  45.0  25.1  47.0
2017-12-01 10:00:07.500  25.1  47.0       2  25.0  45.0  25.1  47.0
2017-12-01 10:00:12.000  35.1  75.0       1  35.1  75.0   NaN   NaN
2017-12-01 10:00:12.500  35.0  77.2       2   NaN   NaN  35.0  77.2

pykalman 的文档表明它可以处理丢失的数据,但这是正确的吗?

但是,pykalman 的文档对可变时间问题一点也不清楚。文档只是说:

"Both the Kalman Filter and Kalman Smoother are able to use parameters which vary with time. In order to use this, one need only pass in an array n_timesteps in length along its first axis:"

>>> transition_offsets = [[-1], [0], [1], [2]]
>>> kf = KalmanFilter(transition_offsets=transition_offsets, n_dim_obs=1)

我找不到任何使用可变时间步长的 pykalman Smoother 的例子。因此,任何使用我上述数据的指导、示例甚至示例都会非常有帮助。 我没有必要使用 pykalman,但它似乎是平滑此数据的有用工具。

***** 下面添加了额外的代码 @Anton 我制作了一个使用 smooth 函数的有用代码版本。奇怪的是,它似乎以相同的权重对待每一个观察结果,并且轨迹贯穿每一个观察结果。即使我在传感器方差值之间存在很大差异。我希望在 5.4、5.0 点附近,过滤后的轨迹应该更接近传感器 1 点,因为该点具有较低的方差。相反,轨迹正好到达每个点,然后大转弯到达那里。

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

import numpy.ma as ma

obs_cov=np.zeros([n_timesteps,2,2])
obs=np.zeros([n_timesteps,2])

for t in range(n_timesteps):
    if Sensor[t] == 0:
        obs[t]=None
    else:
        obs[t] = [X[t], Y[t]]
        if Sensor[t] == 1:
            obs_cov[t] = np.asarray(R_1)
        else:
            obs_cov[t] = np.asarray(R_2)

ma_obs=ma.masked_invalid(obs)

ma_obs_cov=ma.masked_invalid(obs_cov)

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = ma_obs_cov, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

filtered_state_means, filtered_state_covariances=kf.smooth(ma_obs)


# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

对于卡尔曼滤波器,用恒定时间步长表示输入数据很有用。您的传感器随机发送数据,因此您可以为您的系统定义最小的有效时间步长,并使用此步长离散化时间轴。

例如,您的一个传感器大约每 0.2 秒发送一次数据,第二个传感器每 0.5 秒发送一次数据。所以最小的时间步长可以是 0.01 秒(这里你需要在计算时间和期望的精度之间找到一个权衡)。

您的数据将如下所示:

Time    Sensor  X       Y
0,52        0   0       0
0,53        1   0,3417  0,2988
0,54        0   0       0
0,56        0   0       0
0,57        0   0       0
0,55        0   0       0
0,58        0   0       0
0,59        2   0,4247  0,3779
0,60        0   0       0
0,61        0   0       0
0,62        0   0       0

现在您需要根据您的观察调用 Pykalman 函数 filter_update。如果没有观察,过滤器会根据前一个状态预测下一个状态。如果有观察,它会更正系统状态。

可能是您的传感器精度不同。因此,您可以根据传感器方差指定观测协方差。

为了演示这个想法,我生成了一个 2D 轨迹并随机放置了 2 个具有不同精度的传感器的测量值。

Sensor1: mean update time = 1.0s; max error = 0.1m;
Sensor2: mean update time = 0.7s; max error = 0.3m;

结果如下:

我故意选择了非常糟糕的参数,所以可以看到预测和校正步骤。在传感器更新之间,过滤器根据上一步的恒定速度预测轨迹。一旦更新到来,过滤器就会根据传感器的方差纠正位置。第二个传感器的精度很差,所以影响系统重量比较轻

这是我的 python 代码:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

我放了 csv 文件 here 这样你就可以执行代码了。

希望能帮到你。

更新

关于您关于可变转移矩阵的建议的一些信息。我会说这取决于您的传感器的可用性以及对估计结果的要求。

这里我用一个常量和一个变量转换矩阵绘制了相同的估计(我改变了转换协方差矩阵,否则估计太差了,因为高过滤器"stiffness"):

如你所见,黄色标记的估计位置非常好。 但是! 您没有传感器读数之间的信息。使用可变转换矩阵,您可以避免读数之间的预测步骤,并且不知道系统会发生什么。如果您的阅读率很高,这可能就足够了,否则可能是一个劣势。

这是更新后的代码:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-2,     0,     0,     0], 
     [   0,  1e-2,     0,     0],
     [   0,     0,  1e-2,     0],
     [   0,     0,     0,  1e-2]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4

filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

filtered_state_means2 = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances2 = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

# Kalman-Filter initialization (Different F Matrices depending on DT)
kf2 = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0

        # For second filter
        filtered_state_means2[t] = X0
        filtered_state_covariances2[t] = P0

        timestamp = Time[t]
        old_t = t
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

        #For the second filter
        if Sensor[t] != 0:

            obs2 = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov2 = np.asarray(R_1)
            else:
                obs_cov2 = np.asarray(R_2)  

            dt2 = Time[t] - timestamp

            timestamp = Time[t]        

            # transition_matrix  
            F2 = [[1,  0,  dt2,    0], 
                  [0,  1,    0,  dt2],
                  [0,  0,    1,    0],
                  [0,  0,    0,    1]] 

            filtered_state_means2[t], filtered_state_covariances2[t] = (
            kf2.filter_update(
                filtered_state_means2[old_t],
                filtered_state_covariances2[old_t],
                observation = obs2,
                observation_covariance = obs_cov2,
                transition_matrix = np.asarray(F2))
            )      

            old_t = t

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1", markersize=9)
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2", markersize=9)
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.plot(filtered_state_means2[:, 0], filtered_state_means2[:, 1], "yo", label="Filtered Trajectory 2", markersize=6)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

我没有在这段代码中实现的另一个重要点:在使用可变转换矩阵时,您还需要改变转换协方差矩阵(取决于当前的 dt)。

这是个有趣的话题。让我知道哪种估计最适合您的问题。