EKF2: centralized auto-generated state (#22183)

* ekf2_derivation: use single source of state definition

The state is defined as an ordered dictionary of group elements and
everything else is generated using that state definition

* ekf2: generated state sample add const reference getter

---------

Co-authored-by: bresch <[brescianimathieu@gmail.com](mailto:brescianimathieu@gmail.com)>
Co-authored-by: Daniel Agar <daniel@agar.ca>
This commit is contained in:
Mathieu Bresciani
2023-10-06 16:28:21 +02:00
committed by GitHub
parent 2e1d5687f9
commit 05fd8c5976
15 changed files with 279 additions and 206 deletions
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Copyright (c) 2022 PX4 Development Team
Copyright (c) 2022-2023 PX4 Development Team
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
@@ -37,82 +37,92 @@ import symforce
symforce.set_epsilon_to_symbol()
import symforce.symbolic as sf
from symforce import typing as T
from symforce import ops
from symforce.values import Values
from derivation_utils import *
class State:
qw = 0
qx = 1
qy = 2
qz = 3
vx = 4
vy = 5
vz = 6
px = 7
py = 8
pz = 9
gyro_bx = 10
gyro_by = 11
gyro_bz = 12
accel_bx = 13
accel_by = 14
accel_bz = 15
ix = 16
iy = 17
iz = 18
ibx = 19
iby = 20
ibz = 21
wx = 22
wy = 23
n_states = 24
# The state vector is organized in an ordered dictionary
State = Values(
quat_nominal = sf.V4(),
vel = sf.V3(),
pos = sf.V3(),
gyro_bias = sf.V3(),
accel_bias = sf.V3(),
mag_I = sf.V3(),
mag_B = sf.V3(),
wind_vel = sf.V2()
)
class IdxDof():
def __init__(self, idx, dof):
self.idx = idx
self.dof = dof
def BuildTangentStateIndex():
# Build a dictionary that can be used to access elements of vectors
# and matrices defined in the state tangent space (e.g.: P, K and H)
tangent_state_index = {}
idx = 0
for key in State.keys_recursive():
dof = State[key].tangent_dim()
tangent_state_index[key] = IdxDof(idx, dof)
idx += dof
return tangent_state_index
tangent_idx = BuildTangentStateIndex()
class VState(sf.Matrix):
SHAPE = (State.n_states, 1)
SHAPE = (State.storage_dim(), 1)
class MState(sf.Matrix):
SHAPE = (State.n_states, State.n_states)
class VTangent(sf.Matrix):
SHAPE = (State.tangent_dim(), 1)
def state_to_quat(state: VState) -> sf.Quaternion:
return sf.Quaternion(sf.V3(state[State.qx], state[State.qy], state[State.qz]), state[State.qw])
class MTangent(sf.Matrix):
SHAPE = (State.tangent_dim(), State.tangent_dim())
def state_to_rot3(state: VState) -> sf.Rot3:
return sf.Rot3(state_to_quat(state))
def state_to_rot3(state: Values):
q = sf.Quaternion(sf.V3(state["quat_nominal"][1], state["quat_nominal"][2], state["quat_nominal"][3]), state["quat_nominal"][0])
return sf.Rot3(q)
def predict_covariance(
state: VState,
P: MState,
d_vel: sf.V3,
d_vel_dt: sf.Scalar,
d_vel_var: sf.V3,
d_ang: sf.V3,
d_ang_dt: sf.Scalar,
d_ang_var: sf.Scalar
):
state: VState,
P: MTangent,
d_vel: sf.V3,
d_vel_dt: sf.Scalar,
d_vel_var: sf.V3,
d_ang: sf.V3,
d_ang_dt: sf.Scalar,
d_ang_var: sf.Scalar
) -> MTangent:
state = State.from_storage(state)
g = sf.Symbol("g") # does not appear in the jacobians
accel_b = sf.V3(state[State.accel_bx], state[State.accel_by], state[State.accel_bz])
d_vel_b = accel_b * d_vel_dt
d_vel_b = state["accel_bias"] * d_vel_dt
d_vel_true = d_vel - d_vel_b
gyro_b = sf.V3(state[State.gyro_bx], state[State.gyro_by], state[State.gyro_bz])
d_ang_b = gyro_b * d_ang_dt
d_ang_b = state["gyro_bias"] * d_ang_dt
d_ang_true = d_ang - d_ang_b
q = state_to_quat(state)
R_to_earth = sf.Rot3(q)
v = sf.V3(state[State.vx], state[State.vy], state[State.vz])
p = sf.V3(state[State.px], state[State.py], state[State.pz])
q = sf.Quaternion(sf.V3(state["quat_nominal"][1], state["quat_nominal"][2], state["quat_nominal"][3]), state["quat_nominal"][0])
R_to_earth = state_to_rot3(state)
v = state["vel"]
p = state["pos"]
q_new = q * sf.Quaternion(sf.V3(0.5 * d_ang_true[0], 0.5 * d_ang_true[1], 0.5 * d_ang_true[2]), 1)
v_new = v + R_to_earth * d_vel_true + sf.V3(0, 0, g) * d_vel_dt
p_new = p + v * d_vel_dt
# Predicted state vector at time t + dt
state_new = VState.block_matrix([[sf.V4(q_new.w, q_new.x, q_new.y, q_new.z)], [v_new], [p_new], [sf.Matrix(state[State.gyro_bx:State.n_states])]])
state_new = state.copy()
state_new["quat_nominal"] = sf.V4(q_new.w, q_new.x, q_new.y, q_new.z), # convert to Hamiltonian form
state_new["vel"] = v_new,
state_new["pos"] = p_new,
# State propagation jacobian
A = state_new.jacobian(state)
G = state_new.jacobian(sf.V6.block_matrix([[d_vel], [d_ang]]))
A = VState(state_new.to_storage()).jacobian(state, tangent_space = False)
G = VState(state_new.to_storage()).jacobian(sf.V6.block_matrix([[d_vel], [d_ang]]), tangent_space = False)
# Covariance propagation
var_u = sf.Matrix.diag([d_vel_var[0], d_vel_var[1], d_vel_var[2], d_ang_var, d_ang_var, d_ang_var])
@@ -121,8 +131,8 @@ def predict_covariance(
# Generate the equations for the upper triangular matrix and the diagonal only
# Since the matrix is symmetric, the lower triangle does not need to be derived
# and can simply be copied in the implementation
for index in range(State.n_states):
for j in range(State.n_states):
for index in range(state.storage_dim()):
for j in range(state.storage_dim()):
if index > j:
P_new[index,j] = 0
@@ -130,43 +140,48 @@ def predict_covariance(
def compute_airspeed_innov_and_innov_var(
state: VState,
P: MState,
P: MTangent,
airspeed: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, sf.Scalar):
vel_rel = sf.V3(state[State.vx] - state[State.wx], state[State.vy] - state[State.wy], state[State.vz])
state = State.from_storage(state)
wind = sf.V3(state["wind_vel"][0], state["wind_vel"][1], 0.0)
vel_rel = state["vel"] - wind
airspeed_pred = vel_rel.norm(epsilon=epsilon)
innov = airspeed_pred - airspeed
H = sf.V1(airspeed_pred).jacobian(state)
H = sf.V1(airspeed_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov, innov_var)
def compute_airspeed_h_and_k(
state: VState,
P: MState,
P: MTangent,
innov_var: sf.Scalar,
epsilon: sf.Scalar
) -> (VState, VState):
) -> (VTangent, VTangent):
vel_rel = sf.V3(state[State.vx] - state[State.wx], state[State.vy] - state[State.wy], state[State.vz])
state = State.from_storage(state)
wind = sf.V3(state["wind_vel"][0], state["wind_vel"][1], 0.0)
vel_rel = state["vel"] - wind
airspeed_pred = vel_rel.norm(epsilon=epsilon)
H = sf.V1(airspeed_pred).jacobian(state)
H = sf.V1(airspeed_pred).jacobian(state, tangent_space=False)
K = P * H.T / sf.Max(innov_var, epsilon)
return (H.T, K)
def predict_sideslip(
state: VState,
state: State,
epsilon: sf.Scalar
) -> (sf.Scalar):
vel_rel = sf.V3(state[State.vx] - state[State.wx], state[State.vy] - state[State.wy], state[State.vz])
wind = sf.V3(state["wind_vel"][0], state["wind_vel"][1], 0.0)
vel_rel = state["vel"] - wind
relative_wind_body = state_to_rot3(state).inverse() * vel_rel
# Small angle approximation of side slip model
@@ -177,166 +192,176 @@ def predict_sideslip(
def compute_sideslip_innov_and_innov_var(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, sf.Scalar, sf.Scalar):
state = State.from_storage(state)
sideslip_pred = predict_sideslip(state, epsilon);
innov = sideslip_pred - 0.0
H = sf.V1(sideslip_pred).jacobian(state)
H = sf.V1(sideslip_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov, innov_var)
def compute_sideslip_h_and_k(
state: VState,
P: MState,
P: MTangent,
innov_var: sf.Scalar,
epsilon: sf.Scalar
) -> (VState, VState):
) -> (VTangent, VTangent):
state = State.from_storage(state)
sideslip_pred = predict_sideslip(state, epsilon);
H = sf.V1(sideslip_pred).jacobian(state)
H = sf.V1(sideslip_pred).jacobian(state, tangent_space=False)
K = P * H.T / sf.Max(innov_var, epsilon)
return (H.T, K)
def predict_mag_body(state) -> sf.V3:
mag_field_earth = sf.V3(state[State.ix], state[State.iy], state[State.iz])
mag_bias_body = sf.V3(state[State.ibx], state[State.iby], state[State.ibz])
mag_field_earth = state["mag_I"]
mag_bias_body = state["mag_B"]
mag_body = state_to_rot3(state).inverse() * mag_field_earth + mag_bias_body
return mag_body
def compute_mag_innov_innov_var_and_hx(
state: VState,
P: MState,
P: MTangent,
meas: sf.V3,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.V3, sf.V3, VState):
) -> (sf.V3, sf.V3, VTangent):
state = State.from_storage(state)
meas_pred = predict_mag_body(state);
innov = meas_pred - meas
innov_var = sf.V3()
Hx = sf.V1(meas_pred[0]).jacobian(state)
Hx = sf.V1(meas_pred[0]).jacobian(state, tangent_space=False)
innov_var[0] = (Hx * P * Hx.T + R)[0,0]
Hy = sf.V1(meas_pred[1]).jacobian(state)
Hy = sf.V1(meas_pred[1]).jacobian(state, tangent_space=False)
innov_var[1] = (Hy * P * Hy.T + R)[0,0]
Hz = sf.V1(meas_pred[2]).jacobian(state)
Hz = sf.V1(meas_pred[2]).jacobian(state, tangent_space=False)
innov_var[2] = (Hz * P * Hz.T + R)[0,0]
return (innov, innov_var, Hx.T)
def compute_mag_y_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
meas_pred = predict_mag_body(state);
H = sf.V1(meas_pred[1]).jacobian(state)
H = sf.V1(meas_pred[1]).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_mag_z_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
meas_pred = predict_mag_body(state);
H = sf.V1(meas_pred[2]).jacobian(state)
H = sf.V1(meas_pred[2]).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_yaw_321_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
R_to_earth = state_to_rot3(state).to_rotation_matrix()
# Fix the singularity at pi/2 by inserting epsilon
meas_pred = sf.atan2(R_to_earth[1,0], R_to_earth[0,0], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_yaw_321_innov_var_and_h_alternate(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
R_to_earth = state_to_rot3(state).to_rotation_matrix()
# Alternate form that has a singularity at yaw 0 instead of pi/2
meas_pred = sf.pi/2 - sf.atan2(R_to_earth[0,0], R_to_earth[1,0], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_yaw_312_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
R_to_earth = state_to_rot3(state).to_rotation_matrix()
# Alternate form to be used when close to pitch +-pi/2
meas_pred = sf.atan2(-R_to_earth[0,1], R_to_earth[1,1], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_yaw_312_innov_var_and_h_alternate(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
R_to_earth = state_to_rot3(state).to_rotation_matrix()
# Alternate form to be used when close to pitch +-pi/2
meas_pred = sf.pi/2 - sf.atan2(-R_to_earth[1,1], R_to_earth[0,1], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (innov_var, H.T)
def compute_mag_declination_pred_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, sf.Scalar, VState):
) -> (sf.Scalar, sf.Scalar, VTangent):
meas_pred = sf.atan2(state[State.iy], state[State.ix], epsilon=epsilon)
state = State.from_storage(state)
meas_pred = sf.atan2(state["mag_I"][1], state["mag_I"][0], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (meas_pred, innov_var, H.T)
@@ -345,8 +370,7 @@ def predict_opt_flow(state, distance, epsilon):
R_to_body = state_to_rot3(state).inverse()
# Calculate earth relative velocity in a non-rotating sensor frame
v = sf.V3(state[State.vx], state[State.vy], state[State.vz])
rel_vel_sensor = R_to_body * v
rel_vel_sensor = R_to_body * state["vel"]
# Divide by range to get predicted angular LOS rates relative to X and Y
# axes. Note these are rates in a non-rotating sensor frame
@@ -360,43 +384,46 @@ def predict_opt_flow(state, distance, epsilon):
def compute_flow_xy_innov_var_and_hx(
state: VState,
P: MState,
P: MTangent,
distance: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.V2, VState):
) -> (sf.V2, VTangent):
state = State.from_storage(state)
meas_pred = predict_opt_flow(state, distance, epsilon);
innov_var = sf.V2()
Hx = sf.V1(meas_pred[0]).jacobian(state)
Hx = sf.V1(meas_pred[0]).jacobian(state, tangent_space=False)
innov_var[0] = (Hx * P * Hx.T + R)[0,0]
Hy = sf.V1(meas_pred[1]).jacobian(state)
Hy = sf.V1(meas_pred[1]).jacobian(state, tangent_space=False)
innov_var[1] = (Hy * P * Hy.T + R)[0,0]
return (innov_var, Hx.T)
def compute_flow_y_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
distance: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
meas_pred = predict_opt_flow(state, distance, epsilon);
Hy = sf.V1(meas_pred[1]).jacobian(state)
Hy = sf.V1(meas_pred[1]).jacobian(state, tangent_space=False)
innov_var = (Hy * P * Hy.T + R)[0,0]
return (innov_var, Hy.T)
def compute_gnss_yaw_pred_innov_var_and_h(
state: VState,
P: MState,
P: MTangent,
antenna_yaw_offset: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, sf.Scalar, VState):
) -> (sf.Scalar, sf.Scalar, VTangent):
state = State.from_storage(state)
R_to_earth = state_to_rot3(state)
# define antenna vector in body frame
@@ -408,13 +435,13 @@ def compute_gnss_yaw_pred_innov_var_and_h(
# Calculate the yaw angle from the projection
meas_pred = sf.atan2(ant_vec_ef[1], ant_vec_ef[0], epsilon=epsilon)
H = sf.V1(meas_pred).jacobian(state)
H = sf.V1(meas_pred).jacobian(state, tangent_space=False)
innov_var = (H * P * H.T + R)[0,0]
return (meas_pred, innov_var, H.T)
def predict_drag(
state: VState,
state: State,
rho: sf.Scalar,
cd: sf.Scalar,
cm: sf.Scalar,
@@ -422,9 +449,8 @@ def predict_drag(
) -> (sf.Scalar):
R_to_body = state_to_rot3(state).inverse()
vel_rel = sf.V3(state[State.vx] - state[State.wx],
state[State.vy] - state[State.wy],
state[State.vz])
wind = sf.V3(state["wind_vel"][0], state["wind_vel"][1], 0.0)
vel_rel = state["vel"] - wind
vel_rel_body = R_to_body * vel_rel
vel_rel_body_xy = sf.V2(vel_rel_body[0], vel_rel_body[1])
@@ -436,52 +462,53 @@ def predict_drag(
def compute_drag_x_innov_var_and_k(
state: VState,
P: MState,
P: MTangent,
rho: sf.Scalar,
cd: sf.Scalar,
cm: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
meas_pred = predict_drag(state, rho, cd, cm, epsilon)
Hx = sf.V1(meas_pred[0]).jacobian(state)
Hx = sf.V1(meas_pred[0]).jacobian(state, tangent_space=False)
innov_var = (Hx * P * Hx.T + R)[0,0]
Ktotal = P * Hx.T / sf.Max(innov_var, epsilon)
K = VState()
K[State.wx] = Ktotal[State.wx]
K[State.wy] = Ktotal[State.wy]
K = VTangent()
K[tangent_idx["wind_vel"].idx: tangent_idx["wind_vel"].idx + tangent_idx["wind_vel"].dof] = Ktotal[tangent_idx["wind_vel"].idx: tangent_idx["wind_vel"].idx + tangent_idx["wind_vel"].dof]
return (innov_var, K)
def compute_drag_y_innov_var_and_k(
state: VState,
P: MState,
P: MTangent,
rho: sf.Scalar,
cd: sf.Scalar,
cm: sf.Scalar,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.Scalar, VState):
) -> (sf.Scalar, VTangent):
state = State.from_storage(state)
meas_pred = predict_drag(state, rho, cd, cm, epsilon)
Hy = sf.V1(meas_pred[1]).jacobian(state)
Hy = sf.V1(meas_pred[1]).jacobian(state, tangent_space=False)
innov_var = (Hy * P * Hy.T + R)[0,0]
Ktotal = P * Hy.T / sf.Max(innov_var, epsilon)
K = VState()
K[State.wx] = Ktotal[State.wx]
K[State.wy] = Ktotal[State.wy]
K = VTangent()
K[tangent_idx["wind_vel"].idx: tangent_idx["wind_vel"].idx + tangent_idx["wind_vel"].dof] = Ktotal[tangent_idx["wind_vel"].idx: tangent_idx["wind_vel"].idx + tangent_idx["wind_vel"].dof]
return (innov_var, K)
def compute_gravity_innov_var_and_k_and_h(
state: VState,
P: MState,
P: MTangent,
meas: sf.V3,
R: sf.Scalar,
epsilon: sf.Scalar
) -> (sf.V3, sf.V3, VState, VState, VState):
) -> (sf.V3, sf.V3, VTangent, VTangent, VTangent):
state = State.from_storage(state)
# get transform from earth to body frame
R_to_body = state_to_rot3(state).inverse()
@@ -497,7 +524,7 @@ def compute_gravity_innov_var_and_k_and_h(
# calculate observation jacobian (H), kalman gain (K), and innovation variance (S)
# for each axis
for i in range(3):
H = sf.V1(meas_pred[i]).jacobian(state)
H = sf.V1(meas_pred[i]).jacobian(state, tangent_space=False)
innov_var[i] = (H * P * H.T + R)[0,0]
K[i] = P * H.T / innov_var[i]
@@ -505,22 +532,24 @@ def compute_gravity_innov_var_and_k_and_h(
def quat_var_to_rot_var(
state: VState,
P: MState,
P: MTangent,
epsilon: sf.Scalar
):
J = sf.V3(state_to_rot3(state).to_tangent(epsilon=epsilon)).jacobian(state)
) -> sf.V3:
state = State.from_storage(state)
J = sf.V3(state_to_rot3(state).to_tangent(epsilon=epsilon)).jacobian(state, tangent_space=False)
rot_cov = J * P * J.T
return sf.V3(rot_cov[0, 0], rot_cov[1, 1], rot_cov[2, 2])
def rot_var_ned_to_lower_triangular_quat_cov(
state: VState,
rot_var_ned: sf.V3
):
) -> sf.M44:
# This function converts an attitude variance defined by a 3D vector in NED frame
# into a 4x4 covariance matrix representing the uncertainty on each of the 4 quaternion parameters
# Note: the resulting quaternion uncertainty is defined as a perturbation
# at the tip of the quaternion (i.e.:body-frame uncertainty)
q = sf.V4(state[State.qw], state[State.qx], state[State.qy], state[State.qz])
state = State.from_storage(state)
q = state["quat_nominal"]
attitude = state_to_rot3(state)
J = q.jacobian(attitude)
@@ -536,12 +565,11 @@ def rot_var_ned_to_lower_triangular_quat_cov(
return q_var.lower_triangle()
print("Derive EKF2 equations...")
generate_px4_function(predict_covariance, output_names=["P_new"])
generate_px4_function(compute_airspeed_innov_and_innov_var, output_names=["innov", "innov_var"])
generate_px4_function(compute_airspeed_h_and_k, output_names=["H", "K"])
generate_px4_function(compute_sideslip_innov_and_innov_var, output_names=["innov", "innov_var"])
generate_px4_function(compute_sideslip_h_and_k, output_names=["H", "K"])
generate_px4_function(predict_covariance, output_names=["P_new"])
generate_px4_function(compute_mag_innov_innov_var_and_hx, output_names=["innov", "innov_var", "Hx"])
generate_px4_function(compute_mag_y_innov_var_and_h, output_names=["innov_var", "H"])
generate_px4_function(compute_mag_z_innov_var_and_h, output_names=["innov_var", "H"])
@@ -559,11 +587,4 @@ generate_px4_function(compute_gravity_innov_var_and_k_and_h, output_names=["inno
generate_px4_function(quat_var_to_rot_var, output_names=["rot_var"])
generate_px4_function(rot_var_ned_to_lower_triangular_quat_cov, output_names=["q_cov_lower_triangle"])
generate_px4_state({"quat_nominal": sf.V4,
"vel": sf.V3,
"pos": sf.V3,
"gyro_bias": sf.V3,
"accel_bias": sf.V3,
"mag_I": sf.V3,
"mag_B": sf.V3,
"wind_vel": sf.V2})
generate_px4_state(State, tangent_idx)
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Copyright (c) 2022 PX4 Development Team
Copyright (c) 2022-2023 PX4 Development Team
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
@@ -88,8 +88,63 @@ def generate_python_function(function_name, output_names):
output_dir="generated",
skip_directory_nesting=True)
def generate_px4_state(states):
print("Generate EKF state definition")
def build_state_struct(state, T="float"):
out = "struct StateSample {\n"
def TypeFromLength(len):
if len == 1:
return f"{T}"
elif len == 2:
return f"matrix::Vector2<{T}>"
elif len == 3:
return f"matrix::Vector3<{T}>"
elif len == 4:
return f"matrix::Quaternion<{T}>"
else:
raise NotImplementedError
for key, val in state.items():
out += f"\t{TypeFromLength(len(val))} {key}{{}};\n"
state_size = state.storage_dim()
out += f"\n\tmatrix::Vector<{T}, {state_size}> Data() const {{\n" \
+ f"\t\tmatrix::Vector<{T}, {state_size}> state;\n"
index = state.index()
for key in index:
out += f"\t\tstate.slice<{index[key].storage_dim}, 1>({index[key].offset}, 0) = {key};\n"
out += "\t\treturn state;\n"
out += "\t};\n" # Data
# const ref vector access
first_field = next(iter(state))
out += f"\n\tconst matrix::Vector<{T}, {state_size}>& vector() const {{\n" \
+ f"\t\treturn *reinterpret_cast<matrix::Vector<{T}, {state_size}>*>(const_cast<float*>(reinterpret_cast<const {T}*>(&{first_field})));\n" \
+ f"\t}};\n\n"
out += "};\n" # StateSample
out += f"static_assert(sizeof(matrix::Vector<{T}, {state_size}>) == sizeof(StateSample), \"state vector doesn't match StateSample size\");\n"
return out
def build_tangent_state_struct(state, tangent_state_index):
out = "struct IdxDof { unsigned idx; unsigned dof; };\n"
out += "namespace State {\n"
start_index = 0
for key in tangent_state_index.keys():
out += f"\tstatic constexpr IdxDof {key}{{{tangent_state_index[key].idx}, {tangent_state_index[key].dof}}};\n"
out += f"\tstatic constexpr uint8_t size{{{state.tangent_dim()}}};\n"
out += "};\n" # namespace State
return out
def generate_px4_state(state, tangent_state_index):
print("Generate EKF tangent state definition")
filename = "state.h"
f = open(f"./generated/{filename}", "w")
header = ["// --------------------------------------------------\n",
@@ -97,21 +152,14 @@ def generate_px4_state(states):
"// --------------------------------------------------\n",
"\n#ifndef EKF_STATE_H",
"\n#define EKF_STATE_H\n\n",
"#include <matrix/math.hpp>\n\n",
"namespace estimator\n{\n"]
f.writelines(header)
f.write("struct IdxDof { unsigned idx; unsigned dof; };\n");
f.write(build_state_struct(state))
f.write("\n")
f.write(build_tangent_state_struct(state, tangent_state_index))
f.write("namespace State {\n");
start_index = 0
for (state_name, state_type) in states.items():
tangent_dim = state_type.tangent_dim()
f.write(f"\tstatic constexpr IdxDof {state_name}{{{start_index}, {tangent_dim}}};\n")
start_index += tangent_dim
f.write(f"\tstatic constexpr uint8_t size{{{start_index}}};\n")
f.write("};\n") # namespace State
f.write("}\n") # namespace estimator
f.write("#endif // !EKF_STATE_H\n")
f.close()
@@ -5,8 +5,40 @@
#ifndef EKF_STATE_H
#define EKF_STATE_H
#include <matrix/math.hpp>
namespace estimator
{
struct StateSample {
matrix::Quaternion<float> quat_nominal{};
matrix::Vector3<float> vel{};
matrix::Vector3<float> pos{};
matrix::Vector3<float> gyro_bias{};
matrix::Vector3<float> accel_bias{};
matrix::Vector3<float> mag_I{};
matrix::Vector3<float> mag_B{};
matrix::Vector2<float> wind_vel{};
matrix::Vector<float, 24> Data() const {
matrix::Vector<float, 24> state;
state.slice<4, 1>(0, 0) = quat_nominal;
state.slice<3, 1>(4, 0) = vel;
state.slice<3, 1>(7, 0) = pos;
state.slice<3, 1>(10, 0) = gyro_bias;
state.slice<3, 1>(13, 0) = accel_bias;
state.slice<3, 1>(16, 0) = mag_I;
state.slice<3, 1>(19, 0) = mag_B;
state.slice<2, 1>(22, 0) = wind_vel;
return state;
};
const matrix::Vector<float, 24>& vector() const {
return *reinterpret_cast<matrix::Vector<float, 24>*>(const_cast<float*>(reinterpret_cast<const float*>(&quat_nominal)));
};
};
static_assert(sizeof(matrix::Vector<float, 24>) == sizeof(StateSample), "state vector doesn't match StateSample size");
struct IdxDof { unsigned idx; unsigned dof; };
namespace State {
static constexpr IdxDof quat_nominal{0, 4};