CI: improve mavros SITL tests logging (#8714)

* add more logging to help with #8556
* log subscribed topics on mission start and test exit (pass or fail)
* use mavlink enums everywhere to avoid maintaining dictionary mappings and to have readable values
* log when the FCU advances to next mission item without satisfying the position reached offset/radius
* some renaming for readability
* log more state value changes (connected and MAV_STATUS)
This commit is contained in:
Anthony Lamping
2018-01-17 17:54:32 -05:00
committed by Daniel Agar
parent 821eb5ac87
commit 85ba160757
4 changed files with 132 additions and 98 deletions
@@ -43,8 +43,9 @@ PKG = 'px4'
import rospy
from geometry_msgs.msg import Quaternion, Vector3
from mavros_msgs.msg import AttitudeTarget, ExtendedState
from mavros_msgs.msg import AttitudeTarget
from mavros_test_common import MavrosTestCommon
from pymavlink import mavutil
from std_msgs.msg import Header
from threading import Thread
from tf.transformations import quaternion_from_euler
@@ -72,7 +73,7 @@ class MavrosOffboardAttctlTest(MavrosTestCommon):
self.att_thread.start()
def tearDown(self):
pass
super(MavrosOffboardAttctlTest, self).tearDown()
#
# Helper methods
@@ -107,8 +108,10 @@ class MavrosOffboardAttctlTest(MavrosTestCommon):
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
self.wait_for_landed_state(mavutil.mavlink.MAV_LANDED_STATE_ON_GROUND,
10, -1)
self.log_topic_vars()
self.set_mode("OFFBOARD", 5)
self.set_arm(True, 5)
@@ -45,8 +45,8 @@ import rospy
import math
import numpy as np
from geometry_msgs.msg import PoseStamped, Quaternion
from mavros_msgs.msg import ExtendedState
from mavros_test_common import MavrosTestCommon
from pymavlink import mavutil
from std_msgs.msg import Header
from threading import Thread
from tf.transformations import quaternion_from_euler
@@ -77,7 +77,7 @@ class MavrosOffboardPosctlTest(MavrosTestCommon):
self.pos_thread.start()
def tearDown(self):
pass
super(MavrosOffboardPosctlTest, self).tearDown()
#
# Helper methods
@@ -155,8 +155,10 @@ class MavrosOffboardPosctlTest(MavrosTestCommon):
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
self.wait_for_landed_state(mavutil.mavlink.MAV_LANDED_STATE_ON_GROUND,
10, -1)
self.log_topic_vars()
self.set_mode("OFFBOARD", 5)
self.set_arm(True, 5)
@@ -9,26 +9,11 @@ from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, State, \
WaypointList
from mavros_msgs.srv import CommandBool, ParamGet, SetMode, WaypointClear, \
WaypointPush
from pymavlink import mavutil
from sensor_msgs.msg import NavSatFix
class MavrosTestCommon(unittest.TestCase):
# dictionaries correspond to mavros ExtendedState msg
LAND_STATES = {
0: 'UNDEFINED',
1: 'ON_GROUND',
2: 'IN_AIR',
3: 'TAKEOFF',
4: 'LANDING'
}
VTOL_STATES = {
0: 'VTOL UNDEFINED',
1: 'VTOL MC-to-FW',
2: 'VTOL FW-to-MC',
3: 'VTOL MC',
4: 'VTOL FW'
}
def __init__(self, *args):
super(MavrosTestCommon, self).__init__(*args)
@@ -91,6 +76,9 @@ class MavrosTestCommon(unittest.TestCase):
self.state_sub = rospy.Subscriber('mavros/state', State,
self.state_callback)
def tearDown(self):
self.log_topic_vars()
#
# Callback functions
#
@@ -104,13 +92,15 @@ class MavrosTestCommon(unittest.TestCase):
def extended_state_callback(self, data):
if self.extended_state.vtol_state != data.vtol_state:
rospy.loginfo("VTOL state changed from {0} to {1}".format(
self.VTOL_STATES.get(self.extended_state.vtol_state),
self.VTOL_STATES.get(data.vtol_state)))
mavutil.mavlink.enums['MAV_VTOL_STATE']
[self.extended_state.vtol_state].name, mavutil.mavlink.enums[
'MAV_VTOL_STATE'][data.vtol_state].name))
if self.extended_state.landed_state != data.landed_state:
rospy.loginfo("landed state changed from {0} to {1}".format(
self.LAND_STATES.get(self.extended_state.landed_state),
self.LAND_STATES.get(data.landed_state)))
mavutil.mavlink.enums['MAV_LANDED_STATE']
[self.extended_state.landed_state].name, mavutil.mavlink.enums[
'MAV_LANDED_STATE'][data.landed_state].name))
self.extended_state = data
@@ -150,10 +140,20 @@ class MavrosTestCommon(unittest.TestCase):
rospy.loginfo("armed state changed from {0} to {1}".format(
self.state.armed, data.armed))
if self.state.connected != data.connected:
rospy.loginfo("connected changed from {0} to {1}".format(
self.state.connected, data.connected))
if self.state.mode != data.mode:
rospy.loginfo("mode changed from {0} to {1}".format(
self.state.mode, data.mode))
if self.state.system_status != data.system_status:
rospy.loginfo("system_status changed from {0} to {1}".format(
mavutil.mavlink.enums['MAV_STATE'][
self.state.system_status].name, mavutil.mavlink.enums[
'MAV_STATE'][data.system_status].name))
self.state = data
# mavros publishes a disconnected state message on init
@@ -173,9 +173,8 @@ class MavrosTestCommon(unittest.TestCase):
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo(
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
format(arm, old_arm, i / loop_freq, timeout))
rospy.loginfo("set arm success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
@@ -201,9 +200,8 @@ class MavrosTestCommon(unittest.TestCase):
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo(
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
format(mode, old_mode, i / loop_freq, timeout))
rospy.loginfo("set mode success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
@@ -240,53 +238,52 @@ class MavrosTestCommon(unittest.TestCase):
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
format(self.sub_topics_ready, timeout)))
def wait_on_landed_state(self, desired_landed_state, timeout, index):
rospy.loginfo(
"waiting for landed state | state: {0}, index: {1}".format(
self.LAND_STATES.get(desired_landed_state), index))
def wait_for_landed_state(self, desired_landed_state, timeout, index):
rospy.loginfo("waiting for landed state | state: {0}, index: {1}".
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
desired_landed_state].name, index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
landed_state_confirmed = False
for i in xrange(timeout * loop_freq):
if self.extended_state.landed_state == desired_landed_state:
landed_state_confirmed = True
rospy.loginfo(
"landed state confirmed | state: {0}, index: {1} | seconds: {2} of {3}".
format(
self.LAND_STATES.get(desired_landed_state), index, i /
loop_freq, timeout))
rospy.loginfo("landed state confirmed | seconds: {0} of {1}".
format(i / loop_freq, timeout))
break
rate.sleep()
self.assertTrue(landed_state_confirmed, (
"landed state not detected | desired: {0}, current: {1} | index: {2}, timeout(seconds): {3}".
format(
self.LAND_STATES.get(desired_landed_state),
self.LAND_STATES.get(self.extended_state.landed_state), index,
timeout)))
format(mavutil.mavlink.enums['MAV_LANDED_STATE'][
desired_landed_state].name, mavutil.mavlink.enums[
'MAV_VTOL_STATE'][self.extended_state.landed_state].name,
index, timeout)))
def wait_on_transition(self, transition, timeout, index):
def wait_for_vtol_state(self, transition, timeout, index):
"""Wait for VTOL transition, timeout(int): seconds"""
rospy.loginfo(
"waiting for VTOL transition | transition: {0}, index: {1}".format(
self.VTOL_STATES.get(transition), index))
mavutil.mavlink.enums['MAV_VTOL_STATE'][
transition].name, index))
loop_freq = 10 # Hz
rate = rospy.Rate(loop_freq)
transitioned = False
for i in xrange(timeout * loop_freq):
if transition == self.extended_state.vtol_state:
rospy.loginfo(
"transitioned | index: {0} | seconds: {1} of {2}".format(
index, i / loop_freq, timeout))
rospy.loginfo("transitioned | seconds: {0} of {1}".format(
i / loop_freq, timeout))
transitioned = True
break
rate.sleep()
self.assertTrue(transitioned, (
"transition not detected | index: {0} | timeout(seconds): {1}".
format(index, timeout)))
"transition not detected | desired: {0}, current: {1} | index: {2} timeout(seconds): {3}".
format(mavutil.mavlink.enums['MAV_VTOL_STATE'][transition].name,
mavutil.mavlink.enums['MAV_VTOL_STATE'][
self.extended_state.vtol_state].name, index, timeout)))
def clear_wps(self, timeout):
"""timeout(int): seconds"""
@@ -362,8 +359,9 @@ class MavrosTestCommon(unittest.TestCase):
if res.success:
self.mav_type = res.value.integer
rospy.loginfo(
"MAV_TYPE received | value: {0} | seconds: {1} of {2}".
format(self.mav_type, i / loop_freq, timeout))
"MAV_TYPE received | type: {0} | seconds: {1} of {2}".
format(mavutil.mavlink.enums['MAV_TYPE'][self.mav_type]
.name, i / loop_freq, timeout))
break
except rospy.ServiceException as e:
rospy.logerr(e)
@@ -373,3 +371,23 @@ class MavrosTestCommon(unittest.TestCase):
self.assertTrue(res.success, (
"MAV_TYPE param get failed | timeout(seconds): {0}".format(timeout)
))
def log_topic_vars(self):
"""log the state of topic variables"""
rospy.loginfo("========================")
rospy.loginfo("===== topic values =====")
rospy.loginfo("========================")
rospy.loginfo("altitude:\n{}".format(self.altitude))
rospy.loginfo("========================")
rospy.loginfo("extended_state:\n{}".format(self.extended_state))
rospy.loginfo("========================")
rospy.loginfo("global_position:\n{}".format(self.global_position))
rospy.loginfo("========================")
rospy.loginfo("home_position:\n{}".format(self.home_position))
rospy.loginfo("========================")
rospy.loginfo("local_position:\n{}".format(self.local_position))
rospy.loginfo("========================")
rospy.loginfo("mission_wp:\n{}".format(self.mission_wp))
rospy.loginfo("========================")
rospy.loginfo("state:\n{}".format(self.state))
rospy.loginfo("========================")
@@ -51,7 +51,7 @@ import px4tools
import sys
from mavros import mavlink
from mavros.mission import QGroundControlWP
from mavros_msgs.msg import ExtendedState, Mavlink, Waypoint
from mavros_msgs.msg import Mavlink, Waypoint
from mavros_test_common import MavrosTestCommon
from pymavlink import mavutil
from threading import Thread
@@ -151,11 +151,9 @@ class MavrosMissionTest(MavrosTestCommon):
def setUp(self):
super(self.__class__, self).setUp()
self.mc_rad = 5
self.mc_rad = 5 # meters
self.fw_rad = 60
self.fw_alt_rad = 10
self.last_alt_d = None
self.last_pos_d = None
self.mission_name = ""
self.mavlink_pub = rospy.Publisher('mavlink/to', Mavlink, queue_size=1)
@@ -170,7 +168,7 @@ class MavrosMissionTest(MavrosTestCommon):
self.hb_thread.start()
def tearDown(self):
pass
super(MavrosMissionTest, self).tearDown()
#
# Helper methods
@@ -200,65 +198,73 @@ class MavrosMissionTest(MavrosTestCommon):
d = R * c
alt_d = abs(alt - self.altitude.amsl)
# remember best distances
if not self.last_pos_d or self.last_pos_d > d:
self.last_pos_d = d
if not self.last_alt_d or self.last_alt_d > alt_d:
self.last_alt_d = alt_d
rospy.logdebug("d: {0}, alt_d: {1}".format(d, alt_d))
return d < xy_offset and alt_d < z_offset
return (d < xy_offset and alt_d < z_offset), d, alt_d
def reach_position(self, lat, lon, alt, timeout, index):
"""alt(amsl): meters, timeout(int): seconds"""
# reset best distances
self.last_alt_d = None
self.last_pos_d = None
rospy.loginfo(
"trying to reach waypoint | lat: {0:.9f}, lon: {1:.9f}, alt: {2:.2f}, index: {3}".
format(lat, lon, alt, index))
best_pos_xy_d = None
best_pos_z_d = None
fcu_advanced = False
# does it reach the position in 'timeout' seconds?
loop_freq = 2 # Hz
rate = rospy.Rate(loop_freq)
reached = False
for i in xrange(timeout * loop_freq):
# use FW radius if VTOL in FW or transition or FW
if (self.mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_FW or
mavutil.mavlink.MAV_VTOL_STATE_FW or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_TRANSITION_TO_MC or
mavutil.mavlink.MAV_VTOL_STATE_TRANSITION_TO_MC or
self.extended_state.vtol_state ==
ExtendedState.VTOL_STATE_TRANSITION_TO_FW):
mavutil.mavlink.MAV_VTOL_STATE_TRANSITION_TO_FW):
xy_radius = self.fw_rad
z_radius = self.fw_alt_rad
else: # assume MC
xy_radius = self.mc_rad
z_radius = self.mc_rad
if self.is_at_position(lat, lon, alt, xy_radius, z_radius):
reached = True
reached, pos_xy_d, pos_z_d = self.is_at_position(
lat, lon, alt, xy_radius, z_radius)
# remember best distances
if not best_pos_xy_d or best_pos_xy_d > pos_xy_d:
best_pos_xy_d = pos_xy_d
if not best_pos_z_d or best_pos_z_d > pos_z_d:
best_pos_z_d = pos_z_d
if reached:
rospy.loginfo(
"position reached | pos_d: {0:.2f}, alt_d: {1:.2f}, index: {2} | seconds: {3} of {4}".
format(self.last_pos_d, self.last_alt_d, index, i /
loop_freq, timeout))
"position reached | pos_xy_d: {0:.2f}, pos_z_d: {1:.2f}, index: {2} | seconds: {3} of {4}".
format(pos_xy_d, pos_z_d, index, i / loop_freq, timeout))
break
elif not fcu_advanced and (index < self.mission_wp.current_seq or (
index == len(self.mission_wp.waypoints) and
self.mission_wp.current_seq == 0)):
# FCU advanced to the next mission item, or finished mission
fcu_advanced = True
rospy.loginfo(
"FCU advanced to mission item index {0} when checking position | xy off: {1}, z off: {2}, pos_xy_d: {3:.2f}, pos_z_d: {4:.2f},".
format(self.mission_wp.current_seq, xy_radius, z_radius,
pos_xy_d, pos_z_d))
rate.sleep()
self.assertTrue(reached, (
"took too long to get to position | lat: {0:.9f}, lon: {1:.9f}, alt: {2:.2f}, xy off: {3}, z off: {4}, pos_d: {5:.2f}, alt_d: {6:.2f}, index: {7} | timeout(seconds): {8}".
format(lat, lon, alt, xy_radius, z_radius, self.last_pos_d,
self.last_alt_d, index, timeout)))
self.assertTrue(
reached,
"took too long to get to position | lat: {0:.9f}, lon: {1:.9f}, alt: {2:.2f}, xy off: {3}, z off: {4}, current pos_xy_d: {5:.2f}, current pos_z_d: {6:.2f}, best pos_xy_d: {7:.2f}, best pos_z_d: {8:.2f}, index: {9} | timeout(seconds): {10}".
format(lat, lon, alt, xy_radius, z_radius, pos_xy_d, pos_z_d,
best_pos_xy_d, best_pos_z_d, index, timeout))
#
# Test method
#
def test_mission(self):
"""Test mission"""
if len(sys.argv) < 2:
self.fail("usage: mission_test.py mission_file")
return
@@ -275,18 +281,21 @@ class MavrosMissionTest(MavrosTestCommon):
# make sure the simulation is ready to start the mission
self.wait_for_topics(60)
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
self.wait_for_landed_state(mavutil.mavlink.MAV_LANDED_STATE_ON_GROUND,
10, -1)
self.wait_for_mav_type(10)
# push waypoints to FCU and start mission
self.send_wps(wps, 30)
self.log_topic_vars()
self.set_mode("AUTO.MISSION", 5)
self.set_arm(True, 5)
rospy.loginfo("run mission {0}".format(self.mission_name))
for index, waypoint in enumerate(wps):
# only check position for waypoints where this makes sense
if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT or waypoint.frame == Waypoint.FRAME_GLOBAL:
if (waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT or
waypoint.frame == Waypoint.FRAME_GLOBAL):
alt = waypoint.z_alt
if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT:
alt += self.altitude.amsl - self.altitude.relative
@@ -295,21 +304,23 @@ class MavrosMissionTest(MavrosTestCommon):
index)
# check if VTOL transition happens if applicable
if waypoint.command == 84 or waypoint.command == 85 or waypoint.command == 3000:
transition = waypoint.param1
if (waypoint.command == mavutil.mavlink.MAV_CMD_NAV_VTOL_TAKEOFF or
waypoint.command == mavutil.mavlink.MAV_CMD_NAV_VTOL_LAND
or waypoint.command ==
mavutil.mavlink.MAV_CMD_DO_VTOL_TRANSITION):
transition = waypoint.param1 # used by MAV_CMD_DO_VTOL_TRANSITION
if waypoint.command == mavutil.mavlink.MAV_CMD_NAV_VTOL_TAKEOFF: # VTOL takeoff implies transition to FW
transition = mavutil.mavlink.MAV_VTOL_STATE_FW
if waypoint.command == mavutil.mavlink.MAV_CMD_NAV_VTOL_LAND: # VTOL land implies transition to MC
transition = mavutil.mavlink.MAV_VTOL_STATE_MC
if waypoint.command == 84: # VTOL takeoff implies transition to FW
transition = ExtendedState.VTOL_STATE_FW
if waypoint.command == 85: # VTOL land implies transition to MC
transition = ExtendedState.VTOL_STATE_MC
self.wait_on_transition(transition, 60, index)
self.wait_for_vtol_state(transition, 60, index)
# after reaching position, wait for landing detection if applicable
if waypoint.command == 85 or waypoint.command == 21:
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND,
60, index)
if (waypoint.command == mavutil.mavlink.MAV_CMD_NAV_VTOL_LAND or
waypoint.command == mavutil.mavlink.MAV_CMD_NAV_LAND):
self.wait_for_landed_state(
mavutil.mavlink.MAV_LANDED_STATE_ON_GROUND, 60, index)
self.set_arm(False, 5)
self.clear_wps(5)