Фильтр Калмана с различными временными шагами

У меня есть некоторые данные, которые представляют местоположение объекта, измеренного двумя разными датчиками. Итак, мне нужно сделать датчик слияния. Более сложная проблема заключается в том, что данные от каждого датчика поступают в основном в случайное время. Я хотел бы использовать pykalman, чтобы объединить и сгладить данные. Как pykalman может обрабатывать переменные метки времени?

Упрощенная выборка данных будет выглядеть так:

import pandas as pd
data={'time':\
['10:00:00.0','10:00:01.0','10:00:05.2','10:00:07.5','10:00:07.5','10:00:12.0','10:00:12.5']\
,'X':[10,10.1,20.2,25.0,25.1,35.1,35.0],'Y':[20,20.2,41,45,47,75.0,77.2],\
'Sensor':[1,2,1,1,2,1,2]}

df=pd.DataFrame(data,columns=['time','X','Y','Sensor'])
df.time=pd.to_datetime(df.time)
df=df.set_index('time')

И это:

df
Out[130]: 
                            X     Y  Sensor
time                                       
2017-12-01 10:00:00.000  10.0  20.0       1
2017-12-01 10:00:01.000  10.1  20.2       2
2017-12-01 10:00:05.200  20.2  41.0       1
2017-12-01 10:00:07.500  25.0  45.0       1
2017-12-01 10:00:07.500  25.1  47.0       2
2017-12-01 10:00:12.000  35.1  75.0       1
2017-12-01 10:00:12.500  35.0  77.2       2

Что касается вопроса слияния датчиков, я думаю, что я могу просто изменить форму данных, чтобы у меня были позиции X1,Y1,X2,Y2 с кучей пропущенных значений вместо просто X,Y. (Это было связано: http://stackru.com.mevn.net/questions/47386426/2-sensor-readings-fusion-yaw-pitch)

Итак, мои данные могут выглядеть так:

df['X1']=df.X[df.Sensor==1]
df['Y1']=df.Y[df.Sensor==1]
df['X2']=df.X[df.Sensor==2]
df['Y2']=df.Y[df.Sensor==2]
df
Out[132]: 
                            X     Y  Sensor    X1    Y1    X2    Y2
time                                                               
2017-12-01 10:00:00.000  10.0  20.0       1  10.0  20.0   NaN   NaN
2017-12-01 10:00:01.000  10.1  20.2       2   NaN   NaN  10.1  20.2
2017-12-01 10:00:05.200  20.2  41.0       1  20.2  41.0   NaN   NaN
2017-12-01 10:00:07.500  25.0  45.0       1  25.0  45.0  25.1  47.0
2017-12-01 10:00:07.500  25.1  47.0       2  25.0  45.0  25.1  47.0
2017-12-01 10:00:12.000  35.1  75.0       1  35.1  75.0   NaN   NaN
2017-12-01 10:00:12.500  35.0  77.2       2   NaN   NaN  35.0  77.2

Документы для pykalman указывают, что он может обрабатывать недостающие данные, но верно ли это?

Но документы для pykalman не совсем ясно о проблеме переменного времени. Док просто говорит:

"Как Kalman Filter, так и Kalman Smoother могут использовать параметры, которые меняются со временем. Чтобы использовать это, нужно только передать массив длиной n_timesteps вдоль его первой оси:"

>>> transition_offsets = [[-1], [0], [1], [2]]
>>> kf = KalmanFilter(transition_offsets=transition_offsets, n_dim_obs=1)

Я не смог найти каких-либо примеров использования пикальмановского Smoother с переменными временными шагами. Таким образом, любые рекомендации, примеры или даже примеры, использующие мои данные выше, были бы очень полезны. Мне не нужно использовать pykalman, но это кажется полезным инструментом для сглаживания этих данных.

***** Дополнительный код добавлен ниже @Anton Я сделал версию вашего полезного кода, которая использует функцию сглаживания. Странно то, что кажется, что все наблюдения имеют одинаковый вес, и траектория проходит через все. Даже если у меня есть большое различие между значениями дисперсии датчика. Я ожидаю, что около точки 5,4,5,0 отфильтрованная траектория должна приблизиться к точке датчика 1, так как эта точка имеет меньшую дисперсию. Вместо этого траектория идет точно к каждой точке и делает большой поворот, чтобы туда добраться.

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

import numpy.ma as ma

obs_cov=np.zeros([n_timesteps,2,2])
obs=np.zeros([n_timesteps,2])

for t in range(n_timesteps):
    if Sensor[t] == 0:
        obs[t]=None
    else:
        obs[t] = [X[t], Y[t]]
        if Sensor[t] == 1:
            obs_cov[t] = np.asarray(R_1)
        else:
            obs_cov[t] = np.asarray(R_2)

ma_obs=ma.masked_invalid(obs)

ma_obs_cov=ma.masked_invalid(obs_cov)

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = ma_obs_cov, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

filtered_state_means, filtered_state_covariances=kf.smooth(ma_obs)


# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

1 ответ

Решение

Для фильтра Калмана полезно представлять входные данные с постоянным шагом по времени. Ваши датчики отправляют данные случайным образом, так что вы можете определить наименьший значимый временной шаг для вашей системы и дискретизировать временную ось с помощью этого шага.

Например, один из ваших датчиков отправляет данные примерно каждые 0,2 секунды, а второй каждые 0,5 секунды. Таким образом, наименьший шаг по времени может составлять 0,01 секунды (здесь вам нужно найти компромисс между временем вычислений и желаемой точностью).

Ваши данные будут выглядеть так:

Time    Sensor  X       Y
0,52        0   0       0
0,53        1   0,3417  0,2988
0,54        0   0       0
0,56        0   0       0
0,57        0   0       0
0,55        0   0       0
0,58        0   0       0
0,59        2   0,4247  0,3779
0,60        0   0       0
0,61        0   0       0
0,62        0   0       0

Теперь вам нужно вызвать функцию Pykalman filter_update в зависимости от ваших наблюдений. Если наблюдения отсутствуют, фильтр прогнозирует следующее состояние на основе предыдущего. Если есть наблюдение, оно исправляет состояние системы.

Вероятно, ваши датчики имеют разную точность. Таким образом, вы можете указать ковариацию наблюдения в зависимости от дисперсии датчика.

Чтобы продемонстрировать идею, я сгенерировал 2D-траекторию и случайно поместил измерения 2 датчиков с разной точностью.

Sensor1: mean update time = 1.0s; max error = 0.1m;
Sensor2: mean update time = 0.7s; max error = 0.3m;

Вот результат:

Фильтр Пикальмана Калмана с отсутствующими наблюдениями

Я специально выбрал действительно плохие параметры, чтобы можно было увидеть этапы прогнозирования и коррекции. Между обновлениями датчика фильтр прогнозирует траекторию на основе постоянной скорости от предыдущего шага. Как только приходит обновление, фильтр корректирует положение в соответствии с отклонением датчика. Точность второго датчика очень плохая, поэтому он влияет на систему с меньшим весом.

Вот мой код Python:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-4,     0,     0,     0], 
     [   0,  1e-4,     0,     0],
     [   0,     0,  1e-4,     0],
     [   0,     0,     0,  1e-4]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1")
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2")
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

Я поместил CSV-файл здесь, чтобы вы могли выполнить код.

Я надеюсь, что смогу помочь тебе.

ОБНОВИТЬ

Некоторая информация к вашему предложению о переменной матрице перехода. Я бы сказал, что это зависит от доступности ваших датчиков и от требований к результату оценки.

Здесь я построил одну и ту же оценку как с постоянной, так и с переменной матрицей перехода (я изменил ковариационную матрицу перехода, в противном случае оценка была слишком плохой из-за высокой "жесткости" фильтра):

Фильтр Калмана: оценка Пикальмана с постоянной и переменной матрицами переходов

Как видите, примерное положение желтых маркеров довольно хорошее. НО! у вас нет информации между показаниями датчика. Используя переменную матрицу перехода, вы избегаете шага прогнозирования между показаниями и не знаете, что происходит с системой. Это может быть достаточно хорошо, если ваши показания идут с высокой скоростью, но в противном случае это может быть недостатком.

Вот обновленный код:

from pykalman import KalmanFilter
import numpy as np
import matplotlib.pyplot as plt

# reading data (quick and dirty)
Time=[]
RefX=[]
RefY=[]
Sensor=[]
X=[]
Y=[]

for line in open('data/dataset_01.csv'):
    f1, f2, f3, f4, f5, f6 = line.split(';')
    Time.append(float(f1))
    RefX.append(float(f2))
    RefY.append(float(f3))
    Sensor.append(float(f4))
    X.append(float(f5))
    Y.append(float(f6))

# Sensor 1 has a higher precision (max error = 0.1 m)
# Sensor 2 has a lower precision (max error = 0.3 m)

# Variance definition through 3-Sigma rule
Sensor_1_Variance = (0.1/3)**2;
Sensor_2_Variance = (0.3/3)**2;

# Filter Configuration

# time step
dt = Time[2] - Time[1]

# transition_matrix  
F = [[1,  0,  dt,   0], 
     [0,  1,   0,  dt],
     [0,  0,   1,   0],
     [0,  0,   0,   1]]   

# observation_matrix   
H = [[1, 0, 0, 0],
     [0, 1, 0, 0]]

# transition_covariance 
Q = [[1e-2,     0,     0,     0], 
     [   0,  1e-2,     0,     0],
     [   0,     0,  1e-2,     0],
     [   0,     0,     0,  1e-2]] 

# observation_covariance 
R_1 = [[Sensor_1_Variance, 0],
       [0, Sensor_1_Variance]]

R_2 = [[Sensor_2_Variance, 0],
       [0, Sensor_2_Variance]]

# initial_state_mean
X0 = [0,
      0,
      0,
      0]

# initial_state_covariance - assumed a bigger uncertainty in initial velocity
P0 = [[  0,    0,   0,   0], 
      [  0,    0,   0,   0],
      [  0,    0,   1,   0],
      [  0,    0,   0,   1]]

n_timesteps = len(Time)
n_dim_state = 4

filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))

filtered_state_means2 = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances2 = np.zeros((n_timesteps, n_dim_state, n_dim_state))

# Kalman-Filter initialization
kf = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)

# Kalman-Filter initialization (Different F Matrices depending on DT)
kf2 = KalmanFilter(transition_matrices = F, 
                  observation_matrices = H, 
                  transition_covariance = Q, 
                  observation_covariance = R_1, # the covariance will be adapted depending on Sensor_ID
                  initial_state_mean = X0, 
                  initial_state_covariance = P0)


# iterative estimation for each new measurement
for t in range(n_timesteps):
    if t == 0:
        filtered_state_means[t] = X0
        filtered_state_covariances[t] = P0

        # For second filter
        filtered_state_means2[t] = X0
        filtered_state_covariances2[t] = P0

        timestamp = Time[t]
        old_t = t
    else:

        # the observation and its covariance have to be switched depending on Sensor_Id 
        #     Sensor_ID == 0: no observation
        #     Sensor_ID == 1: Sensor 1
        #     Sensor_ID == 2: Sensor 2

        if Sensor[t] == 0:
            obs = None
            obs_cov = None
        else:
            obs = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov = np.asarray(R_1)
            else:
                obs_cov = np.asarray(R_2)

        filtered_state_means[t], filtered_state_covariances[t] = (
        kf.filter_update(
            filtered_state_means[t-1],
            filtered_state_covariances[t-1],
            observation = obs,
            observation_covariance = obs_cov)
        )

        #For the second filter
        if Sensor[t] != 0:

            obs2 = [X[t], Y[t]]

            if Sensor[t] == 1:
                obs_cov2 = np.asarray(R_1)
            else:
                obs_cov2 = np.asarray(R_2)  

            dt2 = Time[t] - timestamp

            timestamp = Time[t]        

            # transition_matrix  
            F2 = [[1,  0,  dt2,    0], 
                  [0,  1,    0,  dt2],
                  [0,  0,    1,    0],
                  [0,  0,    0,    1]] 

            filtered_state_means2[t], filtered_state_covariances2[t] = (
            kf2.filter_update(
                filtered_state_means2[old_t],
                filtered_state_covariances2[old_t],
                observation = obs2,
                observation_covariance = obs_cov2,
                transition_matrix = np.asarray(F2))
            )      

            old_t = t

# extracting the Sensor update points for the plot        
Sensor_1_update_index = [i for i, x in enumerate(Sensor) if x == 1]    
Sensor_2_update_index = [i for i, x in enumerate(Sensor) if x == 2]     

Sensor_1_update_X = [ X[i] for i in Sensor_1_update_index ]        
Sensor_1_update_Y = [ Y[i] for i in Sensor_1_update_index ]   

Sensor_2_update_X = [ X[i] for i in Sensor_2_update_index ]        
Sensor_2_update_Y = [ Y[i] for i in Sensor_2_update_index ] 

# plot of the resulted trajectory
plt.plot(RefX, RefY, "k-", label="Real Trajectory")
plt.plot(Sensor_1_update_X, Sensor_1_update_Y, "ro", label="Sensor 1", markersize=9)
plt.plot(Sensor_2_update_X, Sensor_2_update_Y, "bo", label="Sensor 2", markersize=9)
plt.plot(filtered_state_means[:, 0], filtered_state_means[:, 1], "g.", label="Filtered Trajectory", markersize=1)
plt.plot(filtered_state_means2[:, 0], filtered_state_means2[:, 1], "yo", label="Filtered Trajectory 2", markersize=6)
plt.grid()
plt.legend(loc="upper left")
plt.show()    

Еще один важный момент, который я не реализовал в этом коде: при использовании переменной матрицы перехода необходимо также изменять ковариационную матрицу перехода (в зависимости от текущего значения dt).

Это интересная тема. Дайте мне знать, какая оценка лучше всего подходит для вашей задачи.

Другие вопросы по тегам