EKF: move yaw estimator derivation into main filter derivation

Enables use of common components
This commit is contained in:
Paul Riseborough
2020-08-11 10:05:09 +10:00
committed by Paul Riseborough
parent f89c52e840
commit 510d93858f
6 changed files with 101 additions and 274 deletions
+2 -1
View File
@@ -293,6 +293,7 @@ void EKFGSF_yaw::predictEKF(const uint8_t model_index)
const float dvyVar = dvxVar; // variance of right delta velocity - (m/s)^2
const float dazVar = sq(_gyro_noise * _delta_ang_dt); // variance of yaw delta angle - rad^2
// optimized auto generated code from SymPy script src/lib/ecl/EKF/python/ekf_derivation/main.py
const float S0 = cosf(psi);
const float S1 = ecl::powf(S0, 2);
const float S2 = sinf(psi);
@@ -341,7 +342,7 @@ bool EKFGSF_yaw::updateEKF(const uint8_t model_index)
const float &P12 = _ekf_gsf[model_index].P(1,2);
const float &P22 = _ekf_gsf[model_index].P(2,2);
// optimized auto generated code
// optimized auto generated code from SymPy script src/lib/ecl/EKF/python/ekf_derivation/main.py
const float t0 = ecl::powf(P01, 2);
const float t1 = -t0;
const float t2 = P00*P11 + P00*velObsVar + P11*velObsVar + t1 + ecl::powf(velObsVar, 2);
+99
View File
@@ -24,6 +24,17 @@ def create_cov_matrix(i, j):
else:
return 0
def create_yaw_estimator_cov_matrix():
# define a symbolic covariance matrix
P = Matrix(3,3,create_cov_matrix)
for index in range(3):
for j in range(3):
if index > j:
P[index,j] = P[j,index]
return P
def create_Tbs_matrix(i, j):
return Symbol("Tbs(" + str(i) + "," + str(j) + ")", real=True)
# legacy array format
@@ -390,6 +401,92 @@ def beta_observation(P,state,R_to_body,vx,vy,vz,wx,wy):
return
# yaw estimator prediction and observation code
def yaw_estimator():
dt = symbols("dt", real=True) # dt (sec)
psi = symbols("psi", real=True) # yaw angle of body frame wrt earth frame
vn, ve = symbols("vn ve", real=True) # velocity in world frame (north/east) - m/sec
daz = symbols("daz", real=True) # IMU z axis delta angle measurement in body axes - rad
dazVar = symbols("dazVar", real=True) # IMU Z axis delta angle measurement variance (rad^2)
dvx, dvy = symbols("dvx dvy", real=True) # IMU x and y axis delta velocity measurement in body axes - m/sec
dvxVar, dvyVar = symbols("dvxVar dvyVar", real=True) # IMU x and y axis delta velocity measurement variance (m/s)^2
# derive the body to nav direction transformation matrix
Tbn = Matrix([[cos(psi) , -sin(psi)],
[sin(psi) , cos(psi)]])
# attitude update equation
psiNew = psi + daz
# velocity update equations
velNew = Matrix([vn,ve]) + Tbn*Matrix([dvx,dvy])
# Define the state vectors
stateVector = Matrix([vn,ve,psi])
# Define vector of process equations
newStateVector = Matrix([velNew,psiNew])
# Calculate state transition matrix
F = newStateVector.jacobian(stateVector)
# Derive the covariance prediction equations
# Error growth in the inertial solution is assumed to be driven by 'noise' in the delta angles and
# velocities, after bias effects have been removed.
# derive the control(disturbance) influence matrix from IMU noise to state noise
G = newStateVector.jacobian(Matrix([dvx,dvy,daz]))
# derive the state error matrix
distMatrix = Matrix([[dvxVar , 0 , 0],
[0 , dvyVar , 0],
[0 , 0 , dazVar]])
Q = G * distMatrix * G.T
# propagate covariance matrix
P = create_yaw_estimator_cov_matrix()
P_new = F * P * F.T + Q
P_new_simple = cse(P_new, symbols("S0:1000"), optimizations='basic')
yaw_estimator_covariance_generator = CodeGenerator("./generated/yaw_estimator_covariance_prediction_generated.cpp")
yaw_estimator_covariance_generator.print_string("Equations for covariance matrix prediction")
yaw_estimator_covariance_generator.write_subexpressions(P_new_simple[0])
yaw_estimator_covariance_generator.write_matrix(Matrix(P_new_simple[1]), "_ekf_gsf[model_index].P", True)
yaw_estimator_covariance_generator.close()
# derive the covariance update equation for a NE velocity observation
velObsVar = symbols("velObsVar", real=True) # velocity observation variance (m/s)^2
H = Matrix([[1,0,0],
[0,1,0]])
R = Matrix([[velObsVar , 0],
[0 , velObsVar]])
S = H * P * H.T + R
S_det_inv = 1 / S.det()
S_inv = S.inv()
K = (P * H.T) * S_inv
P_new = P - K * S * K.T
# optimize code
t, [S_det_inv_s, S_inv_s, K_s, P_new_s] = cse([S_det_inv, S_inv, K, P_new], symbols("t0:1000"), optimizations='basic')
yaw_estimator_observation_generator = CodeGenerator("./generated/yaw_estimator_measurement_update_generated.cpp")
yaw_estimator_observation_generator.print_string("Intermediate variables")
yaw_estimator_observation_generator.write_subexpressions(t)
yaw_estimator_observation_generator.print_string("Equations for NE velocity innovation variance's determinante inverse")
yaw_estimator_observation_generator.write_matrix(Matrix([[S_det_inv_s]]), "_ekf_gsf[model_index].S_det_inverse", False)
yaw_estimator_observation_generator.print_string("Equations for NE velocity innovation variance inverse")
yaw_estimator_observation_generator.write_matrix(Matrix(S_inv_s), "_ekf_gsf[model_index].S_inverse", True)
yaw_estimator_observation_generator.print_string("Equations for NE velocity Kalman gain")
yaw_estimator_observation_generator.write_matrix(Matrix(K_s), "K", False)
yaw_estimator_observation_generator.print_string("Equations for covariance matrix update")
yaw_estimator_observation_generator.write_matrix(Matrix(P_new_s), "_ekf_gsf[model_index].P", True)
yaw_estimator_observation_generator.close()
def generate_code():
print('Starting code generation:')
print('Creating symbolic variables ...')
@@ -521,6 +618,8 @@ def generate_code():
body_frame_velocity_observation(P,state,R_to_body,vx,vy,vz)
print('Generating body frame acceleration observation code ...')
body_frame_accel_observation(P,state,R_to_body,vx,vy,vz,wx,wy)
print('Generating yaw estimator code ...')
yaw_estimator()
print('Code generation finished!')
@@ -1,62 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Mar 14 12:47:24 2020
@author: roman
"""
from sympy import ccode
from sympy.codegen.ast import float32, real
class CodeGenerator:
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, 'w')
def print_string(self, string):
self.file.write("// " + string + "\n")
def get_ccode(self, expression):
return ccode(expression, type_aliases={real:float32})
def write_subexpressions(self,subexpressions):
output_string = ""
for item in subexpressions:
output_string = output_string + "const float " + str(item[0]) + " = " + self.get_ccode(item[1]) + ";\n"
output_string = output_string + "\n\n"
self.file.write(output_string)
def write_matrix(self, matrix, identifier, is_symmetric=False):
output_string = ""
if matrix.shape[0] * matrix.shape[1] == 1:
output_string = output_string + identifier + " = " + self.get_ccode(matrix[0]) + ";\n"
elif matrix.shape[0] == 1 or matrix.shape[1] == 1:
for i in range(0,len(matrix)):
if (identifier == "Kfusion"):
# Vector f format used by Kfusion
output_string = output_string + identifier + "(" + str(i) + ") = " + self.get_ccode(matrix[i]) + ";\n"
else:
# legacy array format used by Hfusion
output_string = output_string + identifier + "[" + str(i) + "] = " + self.get_ccode(matrix[i]) + ";\n"
else:
for j in range(0, matrix.shape[1]):
for i in range(0, matrix.shape[0]):
if j >= i or not is_symmetric:
output_string = output_string + identifier + "(" + str(i) + "," + str(j) + ") = " + self.get_ccode(matrix[i,j]) + ";\n"
# legacy array format
# output_string = output_string + identifier + "[" + str(i) + "][" + str(j) + "] = " + self.get_ccode(matrix[i,j]) + ";\n"
if is_symmetric:
output_string = output_string + "\n"
for j in range(0, matrix.shape[1]):
for i in range(0, matrix.shape[0]):
if j < i:
output_string = output_string + identifier + "(" + str(i) + "," + str(j) + ") = " + \
identifier + "(" + str(j) + "," + str(i) + ")" + ";\n"
output_string = output_string + "\n\n"
self.file.write(output_string)
def close(self):
self.file.close()
@@ -1,25 +0,0 @@
// Equations for covariance matrix prediction
const float S0 = cosf(psi);
const float S1 = powf(S0, 2);
const float S2 = sinf(psi);
const float S3 = powf(S2, 2);
const float S4 = S0*dvy + S2*dvx;
const float S5 = P02 - P22*S4;
const float S6 = S0*dvx - S2*dvy;
const float S7 = S0*S2;
const float S8 = P01 + S7*dvxVar - S7*dvyVar;
const float S9 = P12 + P22*S6;
_ekf_gsf[model_index].P(0,0) = P00 - P02*S4 + S1*dvxVar + S3*dvyVar - S4*S5;
_ekf_gsf[model_index].P(0,1) = -P12*S4 + S5*S6 + S8;
_ekf_gsf[model_index].P(1,1) = P11 + P12*S6 + S1*dvyVar + S3*dvxVar + S6*S9;
_ekf_gsf[model_index].P(0,2) = S5;
_ekf_gsf[model_index].P(1,2) = S9;
_ekf_gsf[model_index].P(2,2) = P22 + dazVar;
_ekf_gsf[model_index].P(1,0) = _ekf_gsf[model_index].P(0,1);
_ekf_gsf[model_index].P(2,0) = _ekf_gsf[model_index].P(0,2);
_ekf_gsf[model_index].P(2,1) = _ekf_gsf[model_index].P(1,2);
@@ -1,74 +0,0 @@
// Intermediate variables
const float t0 = powf(P01, 2);
const float t1 = -t0;
const float t2 = P00*P11 + P00*velObsVar + P11*velObsVar + t1 + powf(velObsVar, 2);
const float t3 = 1.0F/t2;
const float t4 = P11 + velObsVar;
const float t5 = P01*t3;
const float t6 = -t5;
const float t7 = P00 + velObsVar;
const float t8 = P00*t4 + t1;
const float t9 = t5*velObsVar;
const float t10 = P11*t7;
const float t11 = t1 + t10;
const float t12 = P01*P12;
const float t13 = P02*t4;
const float t14 = P01*P02;
const float t15 = P12*t7;
const float t16 = t0*velObsVar;
const float t17 = powf(t2, -2);
const float t18 = t4*velObsVar + t8;
const float t19 = t17*t18;
const float t20 = t17*(t16 + t7*t8);
const float t21 = t0 - t10;
const float t22 = t17*t21;
const float t23 = t14 - t15;
const float t24 = P01*t23;
const float t25 = t12 - t13;
const float t26 = t16 - t21*t4;
const float t27 = t17*t26;
const float t28 = t11 + t7*velObsVar;
const float t29 = t17*t8; // unused variable
const float t30 = t17*t28;
const float t31 = P01*t25;
const float t32 = t23*t4 + t31;
const float t33 = t17*t32;
const float t34 = P01*velObsVar; // Unused variable
const float t35 = t24 + t25*t7;
const float t36 = t17*t35;
// Equations for NE velocity innovation variance's determinante inverse
_ekf_gsf[model_index].S_det_inverse = t3;
// Equations for NE velocity innovation variance inverse
_ekf_gsf[model_index].S_inverse(0,0) = t3*t4;
_ekf_gsf[model_index].S_inverse(0,1) = t6;
_ekf_gsf[model_index].S_inverse(1,1) = t3*t7;
_ekf_gsf[model_index].S_inverse(1,0) = _ekf_gsf[model_index].S_inverse(0,1);
// Equations for NE velocity Kalman gain
K(0,0) = t3*t8;
K(1,0) = t9;
K(2,0) = t3*(-t12 + t13);
K(0,1) = t9;
K(1,1) = t11*t3;
K(2,1) = t3*(-t14 + t15);
// Equations for covariance matrix update
_ekf_gsf[model_index].P(0,0) = P00 - t16*t19 - t20*t8;
_ekf_gsf[model_index].P(0,1) = P01*(t18*t22 - t20*velObsVar + 1);
_ekf_gsf[model_index].P(1,1) = P11 - t16*t30 + t22*t26;
_ekf_gsf[model_index].P(0,2) = P02 + t19*t24 + t20*t25;
_ekf_gsf[model_index].P(1,2) = P12 + t23*t27 + t30*t31;
_ekf_gsf[model_index].P(2,2) = P22 - t23*t33 - t25*t36;
_ekf_gsf[model_index].P(1,0) = _ekf_gsf[model_index].P(0,1);
_ekf_gsf[model_index].P(2,0) = _ekf_gsf[model_index].P(0,2);
_ekf_gsf[model_index].P(2,1) = _ekf_gsf[model_index].P(1,2);
-112
View File
@@ -1,112 +0,0 @@
from sympy import *
from code_gen import *
def create_cov_matrix(i, j):
if j >= i:
return Symbol("P" + str(i) + str(j), real=True)
else:
return 0
def create_symmetric_cov_matrix():
# define a symbolic covariance matrix
P = Matrix(3,3,create_cov_matrix)
for index in range(3):
for j in range(3):
if index > j:
P[index,j] = P[j,index]
return P
print('Starting code generation:')
dt = symbols("dt", real=True) # dt (sec)
psi = symbols("psi", real=True) # yaw angle of body frame wrt earth frame
vn, ve = symbols("vn ve", real=True) # velocity in world frame (north/east) - m/sec
daz = symbols("daz", real=True) # IMU z axis delta angle measurement in body axes - rad
dazVar = symbols("dazVar", real=True) # IMU Z axis delta angle measurement variance (rad^2)
dvx, dvy = symbols("dvx dvy", real=True) # IMU x and y axis delta velocity measurement in body axes - m/sec
dvxVar, dvyVar = symbols("dvxVar dvyVar", real=True) # IMU x and y axis delta velocity measurement variance (m/s)^2
# derive the body to nav direction transformation matrix
Tbn = Matrix([[cos(psi) , -sin(psi)],
[sin(psi) , cos(psi)]])
# attitude update equation
psiNew = psi + daz
# velocity update equations
velNew = Matrix([vn,ve]) + Tbn*Matrix([dvx,dvy])
# Define the state vectors
stateVector = Matrix([vn,ve,psi])
# Define vector of process equations
newStateVector = Matrix([velNew,psiNew])
# Calculate state transition matrix
print('Computing state propagation jacobian ...')
F = newStateVector.jacobian(stateVector)
# Derive the covariance prediction equations
# Error growth in the inertial solution is assumed to be driven by 'noise' in the delta angles and
# velocities, after bias effects have been removed.
# derive the control(disturbance) influence matrix from IMU noise to state noise
G = newStateVector.jacobian(Matrix([dvx,dvy,daz]))
# derive the state error matrix
distMatrix = Matrix([[dvxVar , 0 , 0],
[0 , dvyVar , 0],
[0 , 0 , dazVar]])
Q = G * distMatrix * G.T
# propagate covariance matrix
P = create_symmetric_cov_matrix()
print('Computing covariance propagation ...')
P_new = F * P * F.T + Q
print('Simplifying covariance propagation ...')
P_new_simple = cse(P_new, symbols("S0:1000"), optimizations='basic')
print('Writing covariance propagation to file ...')
cov_prediction_code_generator = CodeGenerator("./generated/covariance_prediction_generated.cpp")
cov_prediction_code_generator.print_string("Equations for covariance matrix prediction")
cov_prediction_code_generator.write_subexpressions(P_new_simple[0])
cov_prediction_code_generator.write_matrix(Matrix(P_new_simple[1]), "_ekf_gsf[model_index].P", True)
cov_prediction_code_generator.close()
# derive the covariance update equation for a NE velocity observation
print('Computing NE velocity observatio innovation variance code ...')
velObsVar = symbols("velObsVar", real=True) # velocity observation variance (m/s)^2
H = Matrix([[1,0,0],
[0,1,0]])
R = Matrix([[velObsVar , 0],
[0 , velObsVar]])
print('Computing NE velocity measurement update code ...')
S = H * P * H.T + R
S_det_inv = 1 / S.det()
S_inv = S.inv()
K = (P * H.T) * S_inv
P_new = P - K * S * K.T
# optimize code
t, [S_det_inv_s, S_inv_s, K_s, P_new_s] = cse([S_det_inv, S_inv, K, P_new], symbols("t0:1000"), optimizations='basic')
print('Writing NE velocity measurement update code to file ...')
measurement_update_code_generator = CodeGenerator("./generated/measurement_update_generated.cpp")
measurement_update_code_generator.print_string("Intermediate variables")
measurement_update_code_generator.write_subexpressions(t)
measurement_update_code_generator.print_string("Equations for NE velocity innovation variance's determinante inverse")
measurement_update_code_generator.write_matrix(Matrix([[S_det_inv_s]]), "_ekf_gsf[model_index].S_det_inverse", False)
measurement_update_code_generator.print_string("Equations for NE velocity innovation variance inverse")
measurement_update_code_generator.write_matrix(Matrix(S_inv_s), "_ekf_gsf[model_index].S_inverse", True)
measurement_update_code_generator.print_string("Equations for NE velocity Kalman gain")
measurement_update_code_generator.write_matrix(Matrix(K_s), "K", False)
measurement_update_code_generator.print_string("Equations for covariance matrix update")
measurement_update_code_generator.write_matrix(Matrix(P_new_s), "_ekf_gsf[model_index].P", True)
measurement_update_code_generator.close()