mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
They can be selected independently in the control parameter, so there is no reason why they should share the same flag.
142 lines
5.2 KiB
Python
142 lines
5.2 KiB
Python
#! /usr/bin/env python3
|
|
"""
|
|
the ecl ekf analysis
|
|
"""
|
|
|
|
from typing import Tuple, List, Dict
|
|
|
|
import numpy as np
|
|
from pyulog import ULog
|
|
|
|
from analysis.detectors import InAirDetector, PreconditionError
|
|
from analysis.metrics import calculate_ecl_ekf_metrics
|
|
from analysis.checks import perform_ecl_ekf_checks
|
|
from analysis.post_processing import get_gps_check_fail_flags
|
|
|
|
def analyse_ekf(
|
|
ulog: ULog, check_levels: Dict[str, float], multi_instance: int = 0,
|
|
red_thresh: float = 1.0, amb_thresh: float = 0.5, min_flight_duration_seconds: float = 5.0,
|
|
in_air_margin_seconds: float = 5.0, pos_checks_when_sensors_not_fused: bool = False) -> \
|
|
Tuple[str, Dict[str, str], Dict[str, float], Dict[str, float]]:
|
|
"""
|
|
:param ulog:
|
|
:param check_levels:
|
|
:param multi_instance:
|
|
:param red_thresh:
|
|
:param amb_thresh:
|
|
:param min_flight_duration_seconds:
|
|
:param in_air_margin_seconds:
|
|
:param pos_checks_when_sensors_not_fused:
|
|
:return:
|
|
"""
|
|
|
|
try:
|
|
estimator_states = ulog.get_dataset('estimator_states', multi_instance).data
|
|
except:
|
|
raise PreconditionError('could not find estimator_states instance', multi_instance)
|
|
|
|
try:
|
|
estimator_status = ulog.get_dataset('estimator_status', multi_instance).data
|
|
except:
|
|
raise PreconditionError('could not find estimator_status instance', multi_instance)
|
|
|
|
try:
|
|
estimator_status_flags = ulog.get_dataset('estimator_status_flags', multi_instance).data
|
|
except:
|
|
raise PreconditionError('could not find estimator_status_flags instance', multi_instance)
|
|
|
|
try:
|
|
_ = ulog.get_dataset('estimator_innovations', multi_instance).data
|
|
except:
|
|
raise PreconditionError('could not find estimator_innovations instance', multi_instance)
|
|
|
|
try:
|
|
in_air = InAirDetector(
|
|
ulog, min_flight_time_seconds=min_flight_duration_seconds, in_air_margin_seconds=0.0)
|
|
in_air_no_ground_effects = InAirDetector(
|
|
ulog, min_flight_time_seconds=min_flight_duration_seconds,
|
|
in_air_margin_seconds=in_air_margin_seconds)
|
|
except Exception as e:
|
|
raise PreconditionError(str(e))
|
|
|
|
if in_air_no_ground_effects.take_off is None:
|
|
raise PreconditionError('no airtime detected.')
|
|
|
|
airtime_info = {
|
|
'in_air_transition_time': round(in_air.take_off + in_air.log_start, 2),
|
|
'on_ground_transition_time': round(in_air.landing + in_air.log_start, 2)}
|
|
|
|
gps_fail_flags = get_gps_check_fail_flags(estimator_status)
|
|
|
|
sensor_checks, innov_fail_checks = find_checks_that_apply(
|
|
estimator_status_flags, estimator_status,
|
|
pos_checks_when_sensors_not_fused=pos_checks_when_sensors_not_fused)
|
|
|
|
metrics = calculate_ecl_ekf_metrics(
|
|
ulog, estimator_status_flags, innov_fail_checks, sensor_checks, in_air, in_air_no_ground_effects,
|
|
multi_instance, red_thresh=red_thresh, amb_thresh=amb_thresh)
|
|
|
|
check_status, master_status = perform_ecl_ekf_checks(
|
|
metrics, sensor_checks, innov_fail_checks, check_levels)
|
|
|
|
return master_status, check_status, metrics, airtime_info
|
|
|
|
|
|
def find_checks_that_apply(
|
|
estimator_status_flags: dict, estimator_status: dict, pos_checks_when_sensors_not_fused: bool = False) ->\
|
|
Tuple[List[str], List[str]]:
|
|
"""
|
|
finds the checks that apply and stores them in lists for the std checks and the innovation
|
|
fail checks.
|
|
:param estimator_status_flags:
|
|
:param estimator_status:
|
|
:param b_pos_only_when_sensors_fused:
|
|
:return: a tuple of two lists that contain strings for the std checks and for the innovation
|
|
fail checks.
|
|
"""
|
|
sensor_checks = list()
|
|
innov_fail_checks = list()
|
|
|
|
# Height Sensor Checks
|
|
sensor_checks.append('hgt')
|
|
innov_fail_checks.append('posv')
|
|
|
|
# Magnetometer Sensor Checks
|
|
if (np.amax(estimator_status_flags['cs_yaw_align']) > 0.5):
|
|
sensor_checks.append('mag')
|
|
|
|
innov_fail_checks.append('magx')
|
|
innov_fail_checks.append('magy')
|
|
innov_fail_checks.append('magz')
|
|
innov_fail_checks.append('yaw')
|
|
|
|
# Velocity Sensor Checks
|
|
if (np.amax(estimator_status_flags['cs_gnss_vel']) > 0.5):
|
|
sensor_checks.append('vel')
|
|
innov_fail_checks.append('velh')
|
|
innov_fail_checks.append('velv')
|
|
|
|
# Position Sensor Checks
|
|
if (pos_checks_when_sensors_not_fused or (np.amax(estimator_status_flags['cs_gnss_pos']) > 0.5)
|
|
or (np.amax(estimator_status_flags['cs_ev_pos']) > 0.5)):
|
|
sensor_checks.append('pos')
|
|
innov_fail_checks.append('posh')
|
|
|
|
# Airspeed Sensor Checks
|
|
# a value > 1.0 means the measurement data for that test has been rejected by the EKF
|
|
if (np.amax(estimator_status['tas_test_ratio']) > 0.0):
|
|
sensor_checks.append('tas')
|
|
innov_fail_checks.append('tas')
|
|
|
|
# Height above ground (rangefinder) sensor checks
|
|
if (np.amax(estimator_status['hagl_test_ratio']) > 0.0):
|
|
sensor_checks.append('hagl')
|
|
innov_fail_checks.append('hagl')
|
|
|
|
# optical flow sensor checks
|
|
if (np.amax(estimator_status_flags['cs_opt_flow']) > 0.5):
|
|
innov_fail_checks.append('ofx')
|
|
innov_fail_checks.append('ofy')
|
|
|
|
return sensor_checks, innov_fail_checks
|