前言
本篇博客主要还是进行不同卡尔曼滤波的数学推导,不会具体涉及到某些传感器。在理解了卡尔曼滤波的数学模型后,对于不同传感器只需要将其测量模型套入运动/观测模型即可。关于基于不同传感器的滤波融合方案,准备之后在阅读论文时再分别整理。
1. SLAM 中的定位概率模型
在 SLAM 问题中,我们想要通过滤波方法求解的问题是:求解一个后验概率,即给定一系列观测(和输入)和初始时刻的先验位姿,估计每一个时刻的位姿,如下所示。
其中, 为先验位姿, 为 时刻的输入, 为 时刻对环境的观测。我们可以将这些变量通过一个图来表示:
可以看出,在这种图模式下,每个状态仅仅依赖于前一时刻的位姿和输入,和历史位姿无关,这体现了一阶马尔科夫性。同理,观测也只和对应时刻的位姿(以及环境)有关。利用贝叶斯公式对后验概率进行展开有:
上式即为最基本的贝叶斯滤波公式求解当前状态的后验概率,公式可以分为三部分:
- :当前时刻,当前状态下对环境的观测
- :从前一时刻到当前时刻的状态变化预测
- ,上一时刻的状态后验概率
因此,用比较通俗的话来描述一下贝叶斯滤波的过程:
- 首先获得上一时刻状态的后验概率分布
- 对上一时刻所有可能的状态根据运动模型对当前状态进行预测,获得当前状态的先验分布
- 结合当前状态的先验分布和当前观测结果的概率分布计算得到当前状态的后验分布
这个流程中涉及两个部分:怎么从上一时刻的状态量结合输入预测当前时刻状态量;怎么在当前状态下获得对环境的观测分布。这分别对应系统中的两个模型:
运动模型:
观测模型:
其中, 分别是运动模型和观测模型的噪声, 为环境信息。如果问题中不需要对环境信息估计的话,可以不考虑,在大部分视觉 SLAM 中会对环境中的特征点进行优化,此时,特征点会作为环境信息纳入优化中。
2. 卡尔曼滤波 Kalman Filter
原始卡尔曼滤波推导
卡尔曼滤波假设位姿和观测符合高斯分布,且运动模型和观测模型都是线性的,因此运动模型和观测模型可以写成以下形式:
为了把状态和先验和后验估计区分开,用 表示先验估计,用 表示后验估计。其中运动模型中,上一时刻的状态后验写为:
基于运动模型得到当前时刻的状态位姿的预测值同样服从高斯分布:
预测值(先验分布)的均值和协方差矩阵根据运动模型和高斯分布下方差传播的性质得到:
显然这一步我们得到的当前时刻的状态值估计的不确定度由于引入了运动误差,相比于上一时刻的不确定度变大了。观测模型的作用就是根据观测估计当前状态的后验概率,降低不确定度。
卡尔曼滤波可以分为以下几个阶段:
首先计算当前时刻状态量预测值的均值和方差:
然后,计算以下式子,定义为卡尔曼增益:
最后,对预测值使用卡尔曼增益和观测值进行更新:
def kf_predict(X0, P0, A, Q, B, U1):
X10 = np.dot(A,X0) + np.dot(B,U1)
P10 = np.dot(np.dot(A,P0),A.T)+ Q
return (X10, P10)
def kf_update(X10, P10, Z, H, R):
V = Z - np.dot(H,X10)
K = np.dot(np.dot(P10,H.T),np.linalg.pinv(np.dot(np.dot(H,P10),H.T) + R))
X1 = X10 + np.dot(K,V)
P1 = np.dot(1 - np.dot(K,H),P10)
return (X1, P1, K)
原始卡尔曼滤波要求运动模型和观测模型都是线性的,这在实际情况下通常不满足,因此下面几个卡尔曼滤波的变种就是将卡尔曼滤波应用在非线性系统下的几种思路。
3. 扩展卡尔曼滤波 Extended Kalman Filter (EKF)
扩展卡尔曼滤波的思路比较简单。既然卡尔曼滤波只能用在线性系统下,那么只要对非线性系统进行线性化就可以了。因此,对运动模型和观测模型使用泰勒展开进行线性化:
将上式中四个雅可比分别用四个矩阵表示:
因此,线性化式子可以写成:
接下来为了求卡尔曼增益,我们需要求线性化后的运动模型和观测模型的均值和协方差:
运动模型:
同理可得观测模型:
xxxxxxxxxx
class ExtendedKalmanFilter(DataFilter):
def __init__(self, state_dimension: int, observations_dimension: int):
self.observations_dimension = observations_dimension
self.state_predicted = np.zeros((state_dimension,))
self.state_dimension = state_dimension
self.covariance_predicted = np.matrix(np.zeros((state_dimension, state_dimension)))
def set_initial(self, initial_state: np.array, initial_covariance_estimate: np.matrix):
if initial_state.shape != (self.state_dimension, ):
raise ValueError("Incorrect dimension for initial state")
if initial_covariance_estimate.shape != (self.state_dimension, self.state_dimension):
raise ValueError("Incorrect dimension for state covariance")
self.state_predicted = initial_state
self.covariance_predicted = initial_covariance_estimate
def _predict(self, transition_function: Callable[[np.array, np.array], np.array], control_input: np.array,
process_covariance: np.matrix) -> None:
self.state_predicted = transition_function(self.state_predicted, control_input)
transition_matrix = nd.Jacobian(transition_function)(self.state_predicted, control_input)
self.covariance_predicted = transition_matrix \
.dot(self.covariance_predicted) \
.dot(transition_matrix.T) \
+ process_covariance
def _update(self, observations_function: Callable[[np.array], np.array], observed: np.array,
observations_covariance: np.matrix) -> None:
measurement_residual = observed - observations_function(self.state_predicted)
observations_matrix = nd.Jacobian(observations_function)(self.state_predicted)
residual_covariance = observations_matrix \
.dot(self.covariance_predicted) \
.dot(observations_matrix.T) \
+ observations_covariance
kalman_gain = self.covariance_predicted.dot(observations_matrix.T).dot(residual_covariance.I)
self.state_predicted += np.squeeze(np.asarray(kalman_gain.dot(measurement_residual)))
self.covariance_predicted = (np.identity(kalman_gain.shape[0]) - kalman_gain.dot(observations_matrix))\
.dot(self.covariance_predicted)
def predict_update(self,
transition_function: Callable[[np.array, np.array], np.array],
observations_function: Callable[[np.array], np.array],
control_input: np.array,
observed: np.array,
process_covariance: np.matrix,
observations_covariance: np.matrix) -> Tuple[np.array, np.matrix]:
self._predict(transition_function, control_input, process_covariance)
self._update(observations_function, observed, observations_covariance)
return self.state_predicted, self.covariance_predicted
4. 迭代扩展卡尔曼滤波 Iterative Extended Kalman Filter (IEKF)
IEKF 是在 EKF 上考虑了线性化误差的问题,EKF 的线性化点选择了在当前时刻的先验估计 进行。过程中线性化误差的会随着线性化点和工作点和差距而别大。因此一个很直观的想法是当我们通过这个线性化点得到当前时刻的后验估计 ,我们能不能以这个作为线性化点重新对观测模型进行线性化并重新估计当前时刻状态的后验分布。这个就是 IEKF 的思想。
大致上的流程就是先进行一次 EKF,得到第一次后验估计重复进行以下步骤:
xxxxxxxxxx
class InvariantEKF:
def __init__(self, system, mu_0, sigma_0):
self.sys = system
self.mus = [mu_0]
self.sigmas = [sigma_0]
def predict(self, u):
#get mubar and sigmabar
mu_bar, U = self.sys.f_lie(self.mu, u)
adj_U = self.sys.adjoint(U)
sigma_bar = adj_U @ self.sigma @ adj_U.T + self.sys.Q * self.sys.deltaT**2
#save for use later
self.mus.append( mu_bar )
self.sigmas.append( sigma_bar )
return mu_bar, sigma_bar
def update(self, z):
H = np.array([[0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0]])
z = np.array([z[0], z[1], z[2], 1, 0])
V = ( inv( self.mu )@z - self.sys.b )[:-2]
invmu = inv(self.mu)[:3,:3]
K = self.sigma @ H.T @ inv( H@self.sigma@H.T + invmu@self.sys.R@invmu.T )
self.mus[-1] = self.mu @ expm( self.sys.carat(K @ V) )
self.sigmas[-1] = (np.eye(9) - K @ H) @ self.sigma
return self.mu, self.sigma
def iterate(self, us, zs):
for u, z in zip(us, zs):
self.predict(u)
self.update(z)
return np.array(self.mus)[1:], np.array(self.sigmas)[1:]
5. 基于误差状态的卡尔曼滤波 Error State Extended Kalman Filter (ES-EKF)
ES-EKF 将载体的真实状态分为:名义状态(Nomimal State, 不考虑任何误差得出的理想状态)和误差状态(Error State, 名义状态和真实状态的差值),如下所示:
EK-EKF 的思想就是不对真实状态进行滤波,而改为对误差状态滤波。基本流程是:对名义状态使用不含误差的运动模型一直进行更新,对误差状态进行滤波(包括预测和更新),并将其添加至名义状态中对其进行修正,使其更接近真实值。如下所示:
对运动模型进行线性化,线性化点在上一时刻的名义状态(和上一时刻的状态先验估计一样)处:
移一下项可以得到关于误差状态的线性运动模型:
对观测模型也可以做类似处理:
预测和更新处理和原始卡尔曼滤波基本一致:
预测阶段,预测当前时刻的名义状态(如果上一时刻没有观测更新,则可以用上一时刻的先验,否则用后验估计),同样包括均值和方差:
当有新的观测进来时,使用观测对误差状态进行一次更新(这里是和原始卡尔曼滤波不一样的地方):
计算卡尔曼增益和更新后的误差状态:
然后用更新后的误差状态更新真实状态作为后验估计:
xxxxxxxxxx
class ESEKF:
def __init__(self, p_0=None, v_0=None, q_0=None):
self.p = np.zeros(shape=(1, 3)) if p_0 is None else p_0 # estimated position
self.v = np.zeros(shape=(1, 3)) if v_0 is None else v_0 # estimated velocity
self.q = Quaternion(1.0, 0.0, 0.0, 0.0) if q_0 is None else q_0 # estimated orientation in quaternion
self.g = np.array([0, 0, -9.81]) # vectorized acceleration of gravity
self.a_b = 0 # accelerometer bias
self.w_b = 0 # gyrometer bias
self.F = np.eye(9) # jacobian matrix of error-state kinematics w.r.t. the error state
self.Q = np.zeros((6, 6)) # Covariance matrix of the perturbation impulses
self.p_cov = np.zeros((9, 9)) # Covariance matrix of the error-state
self.l_jac = np.zeros((9, 6)) # jacobian matrix of error-state kinematics w.r.t. the perturbation impulses vector
self.l_jac[3:, :] = np.eye(6) # motion model noise jacobian
self.h_jac = np.zeros((3, 9)) # jacobian matrix of position observation w.r.t to nominal state
self.h_jac[:, :3] = np.eye(3) # measurement model jacobian
self.var_imu_f = 0.10 # variance of force/acceleration measurements (i.e., accelerometer)
self.var_imu_w = 0.25 # variance of rotational velocity measurement (i.e., gyrometer)
def prediction(self, a_m, w_m, dt):
assert (dt > 0)
R = self.q.to_mat() # obtain current orientation in the format of a rotational matrix
acce = R.dot(a_m - self.a_b) # transform acceleration measurement from vehicle frame to world frame
a = acce + self.g # add gravity vector
# Predict the next position, velocity & orientation by integrating the high-rate IMU measurements
self.p += self.v*dt + 0.5*a*dt*dt
self.v += a*dt
self.q = self.q.quat_mult_left(Quaternion(axis_angle=dt * (w_m-self.w_b)), 'Quaternion')
# Predict the covariance matrix of the next error-state
self.F[0:3, 3:6] = np.eye(3) * dt
self.F[3:6, 6:9] = -skew_symmetric(acce) * dt
self.Q[0:3, 0:3] = np.eye(3) * self.var_imu_f * (dt ** 2)
self.Q[3:6, 3:6] = np.eye(3) * self.var_imu_w * (dt ** 2)
# p_cov[k] = F @ p_cov[k - 1] @ F.T + l_jac @ Q @ l_jac.T
self.p_cov = self.F @ self.p_cov @ self.F.T + self.l_jac @ self.Q @ self.l_jac.T
def correction(self, sensor_var, y_k):
# Compute Kalman Gain
K = self.p_cov @ self.h_jac.T @ np.linalg.inv(self.h_jac @ self.p_cov @ self.h_jac.T + sensor_var)
# Compute error state
delta_x = K.dot(y_k - self.p)
# Correct predicted state
self.p += delta_x[0:3]
self.v += delta_x[3:6]
delta_quat = Quaternion(axis_angle=delta_x[6:9])
self.q = delta_quat.quat_mult_left(self.q, 'Quaternion')
# Compute corrected covariance
self.p_cov = (np.eye(9) - (K @ self.h_jac)) @ self.p_cov
6. 无迹卡尔曼滤波 Unscented Kalman Filter (UKF)
UKF 的思想是按照高斯分布的性质进行采样选出若干个 sigma 点,用这些点来模拟当前的先验分布,对这些点通过运动和观测模型进行非线性转换。再分析转换后的点的分布近似当前时刻的后验分布。具体过程如下:
- 预测步骤:
先将噪声和上一时刻的后验状态组合成一个新状态,用新的均值和方差表示:
然后,我们通过以下方法选出 个 sigma 点用来对当前的分布进行近似。
将得到的 Sigma 点重新分解成状态和噪声的形式:
将每个 Sigma 点传入运动模型中,此时,我们获得了当前时刻状态量先验分布的 sigma 点,按照一定权重组合起来,每个 Sigma 点的权重满足以下条件:
将这些点按照权重组合得到当前时刻预测状态的均值和方差:
- 校正步骤:
思路类似,首先将当前时刻先验估计和观测噪声组合起来:
然后,我们通过以下方法选出 个 sigma 点用来对当前的先验分布进行近似。
接下来同样将 sigma 点展开成状态和观测噪声:
将每个 sigma 的状态和噪声传入观测模型:
按照一定权重(和预测步骤一样),计算卡尔曼增益和校正步骤需要的参数:
xxxxxxxxxx
class UnscentedKalmanFilter():
def __init__(self, state_dimension: int, observations_dimension: int):
self.observations_dimension = observations_dimension
self.state_predicted = np.zeros((state_dimension,))
self.state_dimension = state_dimension
self.covariance_predicted = np.matrix(np.zeros((state_dimension, state_dimension)))
self._alpha = 1
self._beta = 2
self._kappa = 0
self._calculate_coefficients()
def _calculate_coefficients(self):
self.c = np.power(self._alpha, 2) * (self.state_dimension + self._kappa)
self.lambda_coeff = np.power(self._alpha, 2) * (self.state_dimension + self._kappa) - self.state_dimension
def set_initial(self, initial_state: np.array, initial_covariance_estimate: np.matrix):
if initial_state.shape != (self.state_dimension,):
raise ValueError("Incorrect dimension for initial state")
if initial_covariance_estimate.shape != (self.state_dimension, self.state_dimension):
raise ValueError("Incorrect dimension for state covariance")
self.state_predicted = initial_state
self.covariance_predicted = initial_covariance_estimate
def _predict(self, transition_function: Callable[[np.array, np.array], np.array], control_input: np.array,
process_covariance: np.matrix) -> None:
sigma_points = self._get_sigma_points(self.state_predicted, self.covariance_predicted)
states_predicted = [transition_function(point, control_input) for point in sigma_points]
self.state_predicted = self._get_combined_point(states_predicted)
self.covariance_predicted = self._get_self_covariance(sigma_points, self.state_predicted) + process_covariance
self.covariance_predicted = self._fix_covariance(self.covariance_predicted)
def _update(self, observations_function: Callable[[np.array], np.array], observed: np.array,
observations_covariance: np.matrix) -> None:
sigma_points = self._get_sigma_points(self.state_predicted, self.covariance_predicted)
measurements_predicted = [observations_function(point) for point in sigma_points]
measurement_predicted = self._get_combined_point(measurements_predicted)
self_covariance = self._get_self_covariance(measurements_predicted, measurement_predicted) \
+ observations_covariance
self_covariance = self._fix_covariance(self_covariance)
#
cross_covariance = 1/(2*self.c) * self._get_cross_covariance(sigma_points, self.state_predicted,
measurements_predicted, measurement_predicted)
kalman_gain = cross_covariance.dot(self_covariance.I)
self.state_predicted += np.asarray(kalman_gain.dot(observed - measurement_predicted)) \
.reshape(self.state_dimension, )
self.covariance_predicted -= kalman_gain.dot(self_covariance).dot(kalman_gain.T)
self.covariance_predicted = self._fix_covariance(self.covariance_predicted)
def predict_update(self,
transition_function: Callable[[np.array, np.array], np.array],
observations_function: Callable[[np.array], np.array],
control_input: np.array,
observed: np.array,
process_covariance: np.matrix,
observations_covariance: np.matrix) -> Tuple[np.array, np.matrix]:
self._predict(transition_function, control_input, process_covariance)
self._update(observations_function, observed, observations_covariance)
return self.state_predicted, self.covariance_predicted
7. 粒子滤波 Particle Filter (PF)
粒子滤波的思想和 UKF 有点类似,不过它不假设状态和噪声为高斯分布,因此没办法按照规律选择采样点,所以粒子滤波的思想是直接根据大数定律进行很多次采样,用这些采样点来做非线性变换,并且对结果重新进行计算概率密度。
具体步骤如下:
从上一时刻的后验分布和噪声的联合分布中,抽取 M 个样本(粒子):
将每个例子通过运动模型进行非线性变换:
用这些粒子来近似当前时刻先验状态分布。此时我们不知道哪个粒子更接近真实值,观测的作用就是对各个粒子的正确性进行判断,从而分配权重,方法如下所示:
每个粒子的权重表示在该粒子下出现当前观测的可能性:
用观测模型得到在当前各个粒子状态下的理论观测分布:
计算在该分布下,实际观测的概率:
对所有粒子计算这个概率,并进行归一化获得权重。去掉权重较小的粒子,对剩余的粒子按照其权重进行重采样。
xxxxxxxxxx
class ParticleFilter():
def __init__(self, state_dimension: int, observations_dimension: int, particles_number: int = 2000):
self.observations_dimension = observations_dimension
self.state_dimension = state_dimension
self.particles = None
self.weights = None
self.particles_number = particles_number
self.state_predicted = None
def set_initial(self, initial_state: np.array, initial_stds: np.array):
self.state_predicted = initial_state
self.init_particles(initial_state, initial_stds, self.particles_number)
def init_particles(self, initial_state, initial_stds, particles_number):
self.particles = [initial_state + np.random.randn(initial_state.shape[0]) * initial_stds
for i in range(0, particles_number)]
self.weights = np.ones((particles_number, ))
self.weights.fill(1/particles_number)
def _predict(self, transition_function: Callable[[np.array, np.array], np.array], control_input: np.array,
control_stds: np.array) -> None:
def get_control_input():
return control_input + np.random.randn(control_input.shape[0]) * control_stds
self.particles = [transition_function(particle, get_control_input()) for particle in self.particles]
def _update(self, observations_function: Callable[[np.array], np.array], observed: np.array,
observations_covariance: np.matrix):
for idx, particle in enumerate(self.particles):
measurement = observations_function(particle)
distribution = scipy.stats.multivariate_normal(measurement, observations_covariance)
self.weights[idx] = distribution.pdf(observed)
self.weights += 1.e-300
self.weights /= sum(self.weights)
def _calculate_predicted(self):
self.state_predicted = np.zeros((self.state_dimension,))
for idx, particle in enumerate(self.particles):
self.state_predicted += self.weights[idx] * particle
return self.state_predicted
def _resampling_required(self):
return 1. / np.sum(np.square(self.weights)) < len(self.particles)/2
def _resample(self):
indexes = stratified_resample(self.weights)
self.particles = [self.particles[idx] for idx in indexes]
self.weights[:] = self.weights[indexes]
self.weights /= sum(self.weights)
def predict_update(self,
transition_function: Callable[[np.array, np.array], np.array],
observations_function: Callable[[np.array], np.array],
control_input: np.array,
observed: np.array,
control_stds: np.array,
observations_stds: np.array) -> Tuple[np.array, np.matrix]:
self._predict(transition_function, control_input, control_stds)
self._update(observations_function, observed, observations_stds)
if self._resampling_required():
self._resample()
self._calculate_predicted()
return self.state_predicted
8. 参考资料
https://blog.csdn.net/GoolyOh/article/details/112523809
https://blog.csdn.net/m0_60703264/article/details/120792078
https://blog.csdn.net/weixin_41480034/article/details/122139538
https://xiaotaoguo.com/p/slam-kalman-filter/
评论(0)
您还未登录,请登录后发表或查看评论