mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-05-13 04:17:35 +08:00
thread for offboard publishers, add asserts for topics to come up (simulation ready) and set mode and arming, use home_position topic as better indicator of when the simulation is ready, add more feedback to rosinfo, make timeouts meaningful (in seconds), add land and extended state values
This commit is contained in:
committed by
Lorenz Meier
parent
5ce381dfc7
commit
f9e7c66718
@@ -37,16 +37,17 @@
|
||||
#
|
||||
# The shebang of this file is currently Python2 because some
|
||||
# dependencies such as pymavlink don't play well with Python3 yet.
|
||||
from __future__ import division
|
||||
|
||||
PKG = 'px4'
|
||||
|
||||
import unittest
|
||||
import rospy
|
||||
from threading import Thread
|
||||
from tf.transformations import quaternion_from_euler
|
||||
from geometry_msgs.msg import PoseStamped, Quaternion, Vector3
|
||||
from mavros_msgs.msg import AttitudeTarget, State
|
||||
from mavros_msgs.msg import AttitudeTarget, HomePosition, State
|
||||
from mavros_msgs.srv import CommandBool, SetMode
|
||||
from sensor_msgs.msg import NavSatFix
|
||||
from std_msgs.msg import Header
|
||||
|
||||
|
||||
@@ -59,48 +60,161 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.rate = rospy.Rate(10) # 10hz
|
||||
self.has_global_pos = False
|
||||
self.local_position = PoseStamped()
|
||||
self.state = State()
|
||||
self.att = AttitudeTarget()
|
||||
self.sub_topics_ready = {
|
||||
key: False
|
||||
for key in ['local_pos', 'home_pos', 'state']
|
||||
}
|
||||
|
||||
# setup ROS topics and services
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
try:
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
except rospy.ROSException:
|
||||
self.fail("failed to connect to mavros services")
|
||||
|
||||
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',
|
||||
CommandBool)
|
||||
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
|
||||
rospy.Subscriber('mavros/local_position/pose', PoseStamped,
|
||||
self.position_callback)
|
||||
rospy.Subscriber('mavros/global_position/global', NavSatFix,
|
||||
self.global_position_callback)
|
||||
rospy.Subscriber('mavros/state', State, self.state_callback)
|
||||
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
|
||||
PoseStamped,
|
||||
self.local_position_callback)
|
||||
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
|
||||
HomePosition,
|
||||
self.home_position_callback)
|
||||
self.state_sub = rospy.Subscriber('mavros/state', State,
|
||||
self.state_callback)
|
||||
self.att_setpoint_pub = rospy.Publisher(
|
||||
'mavros/setpoint_raw/attitude', AttitudeTarget, queue_size=10)
|
||||
|
||||
# send setpoints in seperate thread to better prevent failsafe
|
||||
self.att_thread = Thread(target=self.send_att, args=())
|
||||
self.att_thread.daemon = True
|
||||
self.att_thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
#
|
||||
# General callback functions used in tests
|
||||
# Callback functions
|
||||
#
|
||||
def position_callback(self, data):
|
||||
def local_position_callback(self, data):
|
||||
self.local_position = data
|
||||
|
||||
def global_position_callback(self, data):
|
||||
self.has_global_pos = True
|
||||
if not self.sub_topics_ready['local_pos']:
|
||||
self.sub_topics_ready['local_pos'] = True
|
||||
|
||||
def home_position_callback(self, data):
|
||||
# this topic publishing seems to be a better indicator that the sim
|
||||
# is ready, it's not actually needed
|
||||
self.home_pos_sub.unregister()
|
||||
|
||||
if not self.sub_topics_ready['home_pos']:
|
||||
self.sub_topics_ready['home_pos'] = True
|
||||
|
||||
def state_callback(self, data):
|
||||
self.state = data
|
||||
|
||||
if not self.sub_topics_ready['state']:
|
||||
self.sub_topics_ready['state'] = True
|
||||
|
||||
#
|
||||
# Helper methods
|
||||
#
|
||||
def wait_until_ready(self):
|
||||
"""FIXME: hack to wait for simulation to be ready"""
|
||||
rospy.loginfo("waiting for global position")
|
||||
while not self.has_global_pos:
|
||||
self.rate.sleep()
|
||||
def send_att(self):
|
||||
rate = rospy.Rate(10) # Hz
|
||||
self.att.body_rate = Vector3()
|
||||
self.att.header = Header()
|
||||
self.att.header.frame_id = "base_footprint"
|
||||
self.att.orientation = Quaternion(*quaternion_from_euler(0.25, 0.25,
|
||||
0))
|
||||
self.att.thrust = 0.7
|
||||
self.att.type_mask = 7 # ignore body rate
|
||||
|
||||
while not rospy.is_shutdown():
|
||||
self.att.header.stamp = rospy.Time.now()
|
||||
self.att_setpoint_pub.publish(self.att)
|
||||
try: # prevent garbage in console output when thread is killed
|
||||
rate.sleep()
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
||||
def set_mode(self, mode, timeout):
|
||||
"""mode: PX4 mode string, timeout(int): seconds"""
|
||||
old_mode = self.state.mode
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
mode_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.mode == mode:
|
||||
mode_set = True
|
||||
rospy.loginfo(
|
||||
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
|
||||
format(mode, self.state.mode, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_mode_srv(0, mode) # 0 is custom mode
|
||||
if not res.mode_sent:
|
||||
rospy.logerr("failed to send mode command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(mode_set, (
|
||||
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
|
||||
format(mode, old_mode, timeout)))
|
||||
|
||||
def set_arm(self, arm, timeout):
|
||||
"""arm: True to arm or False to disarm, timeout(int): seconds"""
|
||||
old_arm = self.state.armed
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
arm_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.armed == arm:
|
||||
arm_set = True
|
||||
rospy.loginfo(
|
||||
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
|
||||
format(arm, old_arm, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_arming_srv(arm)
|
||||
if not res.success:
|
||||
rospy.logerr("failed to send arm command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(arm_set, (
|
||||
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
|
||||
format(arm, self.state.armed, timeout)))
|
||||
|
||||
def wait_for_topics(self, timeout):
|
||||
"""wait for simulation to be ready, make sure we're getting topic info
|
||||
from all topics by checking dictionary of flag values set in callbacks,
|
||||
timeout(int): seconds"""
|
||||
rospy.loginfo("waiting for simulation topics to be ready")
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
simulation_ready = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if all(value for value in self.sub_topics_ready.values()):
|
||||
simulation_ready = True
|
||||
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
|
||||
format(i / loop_freq, timeout))
|
||||
break
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(simulation_ready, (
|
||||
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
|
||||
format(self.sub_topics_ready, timeout)))
|
||||
|
||||
#
|
||||
# Test method
|
||||
@@ -108,67 +222,39 @@ class MavrosOffboardAttctlTest(unittest.TestCase):
|
||||
def test_attctl(self):
|
||||
"""Test offboard attitude control"""
|
||||
|
||||
self.wait_until_ready()
|
||||
# delay starting the mission
|
||||
self.wait_for_topics(30)
|
||||
|
||||
# set some attitude and thrust
|
||||
att = AttitudeTarget()
|
||||
att.header = Header()
|
||||
att.header.frame_id = "base_footprint"
|
||||
att.header.stamp = rospy.Time.now()
|
||||
att.orientation = Quaternion(*quaternion_from_euler(0.25, 0.25, 0))
|
||||
att.body_rate = Vector3()
|
||||
att.thrust = 0.7
|
||||
att.type_mask = 7
|
||||
|
||||
# send some setpoints before starting
|
||||
for i in xrange(20):
|
||||
att.header.stamp = rospy.Time.now()
|
||||
self.att_setpoint_pub.publish(att)
|
||||
self.rate.sleep()
|
||||
|
||||
rospy.loginfo("set mission mode and arm")
|
||||
while self.state.mode != "OFFBOARD" or not self.state.armed:
|
||||
if self.state.mode != "OFFBOARD":
|
||||
try:
|
||||
self.set_mode_srv(0, 'OFFBOARD')
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
if not self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(True)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.sleep(2)
|
||||
rospy.loginfo("seting mission mode")
|
||||
self.set_mode("OFFBOARD", 5)
|
||||
rospy.loginfo("arming")
|
||||
self.set_arm(True, 5)
|
||||
|
||||
rospy.loginfo("run mission")
|
||||
# does it cross expected boundaries in X seconds?
|
||||
timeout = 120
|
||||
# does it cross expected boundaries in 'timeout' seconds?
|
||||
timeout = 12 # (int) seconds
|
||||
loop_freq = 10 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
crossed = False
|
||||
for count in xrange(timeout):
|
||||
# update timestamp for each published SP
|
||||
att.header.stamp = rospy.Time.now()
|
||||
self.att_setpoint_pub.publish(att)
|
||||
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if (self.local_position.pose.position.x > 5 and
|
||||
self.local_position.pose.position.z > 5 and
|
||||
self.local_position.pose.position.y < -5):
|
||||
rospy.loginfo("boundary crossed | count {0}".format(count))
|
||||
rospy.loginfo("boundary crossed | seconds: {0} of {1}".format(
|
||||
i / loop_freq, timeout))
|
||||
crossed = True
|
||||
break
|
||||
|
||||
self.rate.sleep()
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(crossed, (
|
||||
"took too long to cross boundaries | x: {0}, y: {1}, z: {2}, timeout: {3}".
|
||||
"took too long to cross boundaries | x: {0}, y: {1}, z: {2} | timeout(seconds): {3}".
|
||||
format(self.local_position.pose.position.x,
|
||||
self.local_position.pose.position.y,
|
||||
self.local_position.pose.position.z, timeout)))
|
||||
|
||||
if self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(False)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.loginfo("disarming")
|
||||
self.set_arm(False, 5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#
|
||||
# The shebang of this file is currently Python2 because some
|
||||
# dependencies such as pymavlink don't play well with Python3 yet.
|
||||
from __future__ import division
|
||||
|
||||
PKG = 'px4'
|
||||
|
||||
@@ -44,11 +45,11 @@ import unittest
|
||||
import rospy
|
||||
import math
|
||||
import numpy as np
|
||||
from threading import Thread
|
||||
from tf.transformations import quaternion_from_euler
|
||||
from geometry_msgs.msg import PoseStamped, Quaternion
|
||||
from mavros_msgs.msg import State
|
||||
from mavros_msgs.msg import HomePosition, State
|
||||
from mavros_msgs.srv import CommandBool, SetMode
|
||||
from sensor_msgs.msg import NavSatFix
|
||||
from std_msgs.msg import Header
|
||||
|
||||
|
||||
@@ -58,52 +59,161 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
|
||||
via MAVROS.
|
||||
|
||||
For the test to be successful it needs to reach all setpoints in a certain time.
|
||||
|
||||
FIXME: add flight path assertion (needs transformation from ROS frame to NED)
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.rate = rospy.Rate(10) # 10hz
|
||||
self.has_global_pos = False
|
||||
self.local_position = PoseStamped()
|
||||
self.state = State()
|
||||
self.pos = PoseStamped()
|
||||
self.sub_topics_ready = {
|
||||
key: False
|
||||
for key in ['local_pos', 'home_pos', 'state']
|
||||
}
|
||||
|
||||
# setup ROS topics and services
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
try:
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
except rospy.ROSException:
|
||||
self.fail("failed to connect to mavros services")
|
||||
|
||||
self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming',
|
||||
CommandBool)
|
||||
self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode)
|
||||
rospy.Subscriber('mavros/local_position/pose', PoseStamped,
|
||||
self.position_callback)
|
||||
rospy.Subscriber('mavros/global_position/global', NavSatFix,
|
||||
self.global_position_callback)
|
||||
rospy.Subscriber('mavros/state', State, self.state_callback)
|
||||
self.local_pos_sub = rospy.Subscriber('mavros/local_position/pose',
|
||||
PoseStamped,
|
||||
self.local_position_callback)
|
||||
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
|
||||
HomePosition,
|
||||
self.home_position_callback)
|
||||
self.state_sub = rospy.Subscriber('mavros/state', State,
|
||||
self.state_callback)
|
||||
self.pos_setpoint_pub = rospy.Publisher(
|
||||
'mavros/setpoint_position/local', PoseStamped, queue_size=10)
|
||||
|
||||
# send setpoints in seperate thread to better prevent failsafe
|
||||
self.pos_thread = Thread(target=self.send_pos, args=())
|
||||
self.pos_thread.daemon = True
|
||||
self.pos_thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
#
|
||||
# General callback functions used in tests
|
||||
# Callback functions
|
||||
#
|
||||
def position_callback(self, data):
|
||||
def local_position_callback(self, data):
|
||||
self.local_position = data
|
||||
|
||||
def global_position_callback(self, data):
|
||||
self.has_global_pos = True
|
||||
if not self.sub_topics_ready['local_pos']:
|
||||
self.sub_topics_ready['local_pos'] = True
|
||||
|
||||
def home_position_callback(self, data):
|
||||
# this topic publishing seems to be a better indicator that the sim
|
||||
# is ready, it's not actually needed
|
||||
self.home_pos_sub.unregister()
|
||||
|
||||
if not self.sub_topics_ready['home_pos']:
|
||||
self.sub_topics_ready['home_pos'] = True
|
||||
|
||||
def state_callback(self, data):
|
||||
self.state = data
|
||||
|
||||
if not self.sub_topics_ready['state']:
|
||||
self.sub_topics_ready['state'] = True
|
||||
|
||||
#
|
||||
# Helper methods
|
||||
#
|
||||
def wait_until_ready(self):
|
||||
"""FIXME: hack to wait for simulation to be ready"""
|
||||
rospy.loginfo("waiting for global position")
|
||||
while not self.has_global_pos:
|
||||
self.rate.sleep()
|
||||
def send_pos(self):
|
||||
rate = rospy.Rate(10) # Hz
|
||||
self.pos.header = Header()
|
||||
self.pos.header.frame_id = "base_footprint"
|
||||
|
||||
while not rospy.is_shutdown():
|
||||
self.pos.header.stamp = rospy.Time.now()
|
||||
self.pos_setpoint_pub.publish(self.pos)
|
||||
try: # prevent garbage in console output when thread is killed
|
||||
rate.sleep()
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
||||
def set_mode(self, mode, timeout):
|
||||
"""mode: PX4 mode string, timeout(int): seconds"""
|
||||
old_mode = self.state.mode
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
mode_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.mode == mode:
|
||||
mode_set = True
|
||||
rospy.loginfo(
|
||||
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
|
||||
format(mode, old_mode, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_mode_srv(0, mode) # 0 is custom mode
|
||||
if not res.mode_sent:
|
||||
rospy.logerr("failed to send mode command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(mode_set, (
|
||||
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
|
||||
format(mode, self.state.mode, timeout)))
|
||||
|
||||
def set_arm(self, arm, timeout):
|
||||
"""arm: True to arm or False to disarm, timeout(int): seconds"""
|
||||
old_arm = self.state.armed
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
arm_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.armed == arm:
|
||||
arm_set = True
|
||||
rospy.loginfo(
|
||||
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
|
||||
format(arm, old_arm, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_arming_srv(arm)
|
||||
if not res.success:
|
||||
rospy.logerr("failed to send arm command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(arm_set, (
|
||||
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
|
||||
format(arm, self.state.armed, timeout)))
|
||||
|
||||
def wait_for_topics(self, timeout):
|
||||
"""wait for simulation to be ready, make sure we're getting topic info
|
||||
from all topics by checking dictionary of flag values set in callbacks,
|
||||
timeout(int): seconds"""
|
||||
rospy.loginfo("waiting for simulation topics to be ready")
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
simulation_ready = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if all(value for value in self.sub_topics_ready.values()):
|
||||
simulation_ready = True
|
||||
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
|
||||
format(i / loop_freq, timeout))
|
||||
break
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(simulation_ready, (
|
||||
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
|
||||
format(self.sub_topics_ready, timeout)))
|
||||
|
||||
def is_at_position(self, x, y, z, offset):
|
||||
rospy.logdebug("current position | x:{0}, y:{1}, z:{2}".format(
|
||||
@@ -118,60 +228,35 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
|
||||
|
||||
def reach_position(self, x, y, z, timeout):
|
||||
# set a position setpoint
|
||||
pos = PoseStamped()
|
||||
pos.header = Header()
|
||||
pos.header.frame_id = "base_footprint"
|
||||
pos.pose.position.x = x
|
||||
pos.pose.position.y = y
|
||||
pos.pose.position.z = z
|
||||
self.pos.pose.position.x = x
|
||||
self.pos.pose.position.y = y
|
||||
self.pos.pose.position.z = z
|
||||
|
||||
# For demo purposes we will lock yaw/heading to north.
|
||||
yaw_degrees = 0 # North
|
||||
yaw = math.radians(yaw_degrees)
|
||||
quaternion = quaternion_from_euler(0, 0, yaw)
|
||||
pos.pose.orientation = Quaternion(*quaternion)
|
||||
self.pos.pose.orientation = Quaternion(*quaternion)
|
||||
|
||||
# send some setpoints before starting
|
||||
for i in xrange(20):
|
||||
pos.header.stamp = rospy.Time.now()
|
||||
self.pos_setpoint_pub.publish(pos)
|
||||
self.rate.sleep()
|
||||
|
||||
rospy.loginfo("set mission mode and arm")
|
||||
while self.state.mode != "OFFBOARD" or not self.state.armed:
|
||||
if self.state.mode != "OFFBOARD":
|
||||
try:
|
||||
self.set_mode_srv(0, 'OFFBOARD')
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
if not self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(True)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.sleep(2)
|
||||
|
||||
rospy.loginfo("run mission")
|
||||
# does it reach the position in X seconds?
|
||||
# does it reach the position in 'timeout' seconds?
|
||||
loop_freq = 10 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
reached = False
|
||||
for count in xrange(timeout):
|
||||
# update timestamp for each published SP
|
||||
pos.header.stamp = rospy.Time.now()
|
||||
self.pos_setpoint_pub.publish(pos)
|
||||
|
||||
if self.is_at_position(pos.pose.position.x, pos.pose.position.y,
|
||||
pos.pose.position.z, 1):
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.is_at_position(self.pos.pose.position.x,
|
||||
self.pos.pose.position.y,
|
||||
self.pos.pose.position.z, 1):
|
||||
rospy.loginfo(
|
||||
"position reached | count: {0}, x: {0}, y: {1}, z: {2}".
|
||||
format(count, pos.pose.position.x, pos.pose.position.y,
|
||||
pos.pose.position.z))
|
||||
"position reached | x: {0}, y: {1}, z: {2} | seconds: {3} of {4}".
|
||||
format(self.pos.pose.position.x, self.pos.pose.position.y,
|
||||
self.pos.pose.position.z, i / loop_freq, timeout))
|
||||
reached = True
|
||||
break
|
||||
|
||||
self.rate.sleep()
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(reached, (
|
||||
"took too long to get to position | x: {0}, y: {1}, z: {2}, timeout: {3}".
|
||||
"took too long to get to position | x: {0}, y: {1}, z: {2} | timeout(seconds): {3}".
|
||||
format(self.local_position.pose.position.x,
|
||||
self.local_position.pose.position.y,
|
||||
self.local_position.pose.position.z, timeout)))
|
||||
@@ -182,19 +267,23 @@ class MavrosOffboardPosctlTest(unittest.TestCase):
|
||||
def test_posctl(self):
|
||||
"""Test offboard position control"""
|
||||
|
||||
self.wait_until_ready()
|
||||
# delay starting the mission
|
||||
self.wait_for_topics(30)
|
||||
|
||||
rospy.loginfo("seting mission mode")
|
||||
self.set_mode("OFFBOARD", 5)
|
||||
rospy.loginfo("arming")
|
||||
self.set_arm(True, 5)
|
||||
|
||||
rospy.loginfo("run mission")
|
||||
positions = ((0, 0, 0), (2, 2, 2), (2, -2, 2), (-2, -2, 2), (2, 2, 2))
|
||||
|
||||
for i in xrange(len(positions)):
|
||||
self.reach_position(positions[i][0], positions[i][1],
|
||||
positions[i][2], 180)
|
||||
positions[i][2], 18)
|
||||
|
||||
if self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(False)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.loginfo("disarming")
|
||||
self.set_arm(False, 5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
|
||||
# The shebang of this file is currently Python2 because some
|
||||
# dependencies such as pymavlink don't play well with Python3 yet.
|
||||
from __future__ import division
|
||||
|
||||
PKG = 'px4'
|
||||
|
||||
@@ -53,8 +54,8 @@ from mavros import mavlink
|
||||
from mavros.mission import QGroundControlWP
|
||||
from pymavlink import mavutil
|
||||
from threading import Thread
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from mavros_msgs.msg import Altitude, ExtendedState, Mavlink, State, Waypoint
|
||||
from mavros_msgs.msg import Altitude, ExtendedState, HomePosition, Mavlink, \
|
||||
State, Waypoint
|
||||
from mavros_msgs.srv import CommandBool, SetMode, WaypointPush
|
||||
from sensor_msgs.msg import NavSatFix
|
||||
|
||||
@@ -94,11 +95,25 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
"""
|
||||
Run a mission
|
||||
"""
|
||||
# dictionaries correspond to mavros ExtendedState msg
|
||||
LAND_STATES = {
|
||||
0: 'UNDEFINED',
|
||||
1: 'ON_GROUND',
|
||||
2: 'IN_AIR',
|
||||
3: 'TAKEOFF',
|
||||
4: 'LANDING'
|
||||
}
|
||||
VTOL_STATES = {
|
||||
0: 'VTOL UNDEFINED',
|
||||
1: 'VTOL MC->FW',
|
||||
2: 'VTOL FW->MC',
|
||||
3: 'VTOL MC',
|
||||
4: 'VTOL FW'
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.rate = rospy.Rate(10) # 10hz
|
||||
self.has_global_pos = False
|
||||
self.local_position = PoseStamped()
|
||||
self.global_position = NavSatFix()
|
||||
self.extended_state = ExtendedState()
|
||||
self.altitude = Altitude()
|
||||
@@ -109,24 +124,37 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
self.last_alt_d = None
|
||||
self.last_pos_d = None
|
||||
self.mission_name = ""
|
||||
self.sub_topics_ready = {
|
||||
key: False
|
||||
for key in ['global_pos', 'home_pos', 'ext_state', 'alt', 'state']
|
||||
}
|
||||
|
||||
# setup ROS topics and services
|
||||
rospy.wait_for_service('mavros/mission/push', 30)
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
try:
|
||||
rospy.wait_for_service('mavros/mission/push', 30)
|
||||
rospy.wait_for_service('mavros/cmd/arming', 30)
|
||||
rospy.wait_for_service('mavros/set_mode', 30)
|
||||
except rospy.ROSException:
|
||||
self.fail("failed to connect to mavros services")
|
||||
|
||||
self.wp_push_srv = rospy.ServiceProxy('mavros/mission/push',
|
||||
WaypointPush)
|
||||
self.set_arming_srv = rospy.ServiceProxy('/mavros/cmd/arming',
|
||||
CommandBool)
|
||||
self.set_mode_srv = rospy.ServiceProxy('/mavros/set_mode', SetMode)
|
||||
rospy.Subscriber('mavros/local_position/pose', PoseStamped,
|
||||
self.position_callback)
|
||||
rospy.Subscriber('mavros/global_position/global', NavSatFix,
|
||||
self.global_position_callback)
|
||||
rospy.Subscriber('mavros/extended_state', ExtendedState,
|
||||
self.extended_state_callback)
|
||||
rospy.Subscriber('mavros/altitude', Altitude, self.altitude_callback)
|
||||
rospy.Subscriber('mavros/state', State, self.state_callback)
|
||||
self.global_pos_sub = rospy.Subscriber('mavros/global_position/global',
|
||||
NavSatFix,
|
||||
self.global_position_callback)
|
||||
self.home_pos_sub = rospy.Subscriber('mavros/home_position/home',
|
||||
HomePosition,
|
||||
self.home_position_callback)
|
||||
self.ext_state_sub = rospy.Subscriber('mavros/extended_state',
|
||||
ExtendedState,
|
||||
self.extended_state_callback)
|
||||
self.alt_sub = rospy.Subscriber('mavros/altitude', Altitude,
|
||||
self.altitude_callback)
|
||||
self.state_sub = rospy.Subscriber('mavros/state', State,
|
||||
self.state_callback)
|
||||
self.mavlink_pub = rospy.Publisher('mavlink/to', Mavlink, queue_size=1)
|
||||
|
||||
# need to simulate heartbeat to prevent datalink loss detection
|
||||
@@ -142,43 +170,127 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
pass
|
||||
|
||||
#
|
||||
# General callback functions used in tests
|
||||
# Callback functions
|
||||
#
|
||||
def position_callback(self, data):
|
||||
self.local_position = data
|
||||
|
||||
def global_position_callback(self, data):
|
||||
self.global_position = data
|
||||
|
||||
if not self.has_global_pos:
|
||||
self.has_global_pos = True
|
||||
if not self.sub_topics_ready['global_pos']:
|
||||
self.sub_topics_ready['global_pos'] = True
|
||||
|
||||
def home_position_callback(self, data):
|
||||
# this topic publishing seems to be a better indicator that the sim
|
||||
# is ready, it's not actually needed
|
||||
self.home_pos_sub.unregister()
|
||||
|
||||
if not self.sub_topics_ready['home_pos']:
|
||||
self.sub_topics_ready['home_pos'] = True
|
||||
|
||||
def extended_state_callback(self, data):
|
||||
prev_state = self.extended_state.vtol_state
|
||||
if self.extended_state.vtol_state != data.vtol_state:
|
||||
rospy.loginfo("VTOL state changed from {0} to {1}".format(
|
||||
self.VTOL_STATES.get(self.extended_state.vtol_state),
|
||||
self.VTOL_STATES.get(data.vtol_state)))
|
||||
|
||||
if self.extended_state.landed_state != data.landed_state:
|
||||
rospy.loginfo("landed state changed from {0} to {1}".format(
|
||||
self.LAND_STATES.get(self.extended_state.landed_state),
|
||||
self.LAND_STATES.get(data.landed_state)))
|
||||
|
||||
self.extended_state = data
|
||||
|
||||
if (prev_state != self.extended_state.vtol_state):
|
||||
rospy.loginfo("VTOL state change: {0}".format(
|
||||
self.extended_state.vtol_state))
|
||||
if not self.sub_topics_ready['ext_state']:
|
||||
self.sub_topics_ready['ext_state'] = True
|
||||
|
||||
def state_callback(self, data):
|
||||
if self.state.armed != data.armed:
|
||||
rospy.loginfo("armed state changed from {0} to {1}".format(
|
||||
self.state.armed, data.armed))
|
||||
|
||||
if self.state.mode != data.mode:
|
||||
rospy.loginfo("mode changed from {0} to {1}".format(
|
||||
self.state.mode, data.mode))
|
||||
|
||||
self.state = data
|
||||
|
||||
if not self.sub_topics_ready['state']:
|
||||
self.sub_topics_ready['state'] = True
|
||||
|
||||
def altitude_callback(self, data):
|
||||
self.altitude = data
|
||||
|
||||
# amsl has been observed to be nan while other fields are valid
|
||||
if not self.sub_topics_ready['alt'] and not math.isnan(data.amsl):
|
||||
self.sub_topics_ready['alt'] = True
|
||||
|
||||
#
|
||||
# Helper methods
|
||||
#
|
||||
def send_heartbeat(self):
|
||||
rate = rospy.Rate(2) # Hz
|
||||
while not rospy.is_shutdown():
|
||||
self.mavlink_pub.publish(self.hb_ros_msg)
|
||||
try:
|
||||
rospy.sleep(0.5)
|
||||
try: # prevent garbage in console output when thread is killed
|
||||
rate.sleep()
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
|
||||
def set_mode(self, mode, timeout):
|
||||
"""mode: PX4 mode string, timeout(int): seconds"""
|
||||
old_mode = self.state.mode
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
mode_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.mode == mode:
|
||||
mode_set = True
|
||||
rospy.loginfo(
|
||||
"set mode success | new mode: {0}, old mode: {1} | seconds: {2} of {3}".
|
||||
format(mode, old_mode, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_mode_srv(0, mode) # 0 is custom mode
|
||||
if not res.mode_sent:
|
||||
rospy.logerr("failed to send mode command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(mode_set, (
|
||||
"failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
|
||||
format(mode, old_mode, timeout)))
|
||||
|
||||
def set_arm(self, arm, timeout):
|
||||
"""arm: True to arm or False to disarm, timeout(int): seconds"""
|
||||
old_arm = self.state.armed
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
arm_set = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.state.armed == arm:
|
||||
arm_set = True
|
||||
rospy.loginfo(
|
||||
"set arm success | new arm: {0}, old arm: {1} | seconds: {2} of {3}".
|
||||
format(arm, old_arm, i / loop_freq, timeout))
|
||||
break
|
||||
else:
|
||||
try:
|
||||
res = self.set_arming_srv(arm)
|
||||
if not res.success:
|
||||
rospy.logerr("failed to send arm command")
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(arm_set, (
|
||||
"failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
|
||||
format(arm, old_arm, timeout)))
|
||||
|
||||
def is_at_position(self, lat, lon, alt, xy_offset, z_offset):
|
||||
"""alt(amsl), xy_offset, z_offset: meters"""
|
||||
R = 6371000 # metres
|
||||
rlat1 = math.radians(lat)
|
||||
rlat2 = math.radians(self.global_position.latitude)
|
||||
@@ -203,18 +315,20 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
return d < xy_offset and alt_d < z_offset
|
||||
|
||||
def reach_position(self, lat, lon, alt, timeout, index):
|
||||
"""alt(amsl): meters, timeout(int): seconds"""
|
||||
# reset best distances
|
||||
self.last_alt_d = None
|
||||
self.last_pos_d = None
|
||||
|
||||
rospy.loginfo(
|
||||
"trying to reach waypoint | " +
|
||||
"lat: {0:13.9f}, lon: {1:13.9f}, alt: {2:6.2f}, timeout: {3}, index: {4}".
|
||||
format(lat, lon, alt, timeout, index))
|
||||
"trying to reach waypoint | lat: {0:13.9f}, lon: {1:13.9f}, alt: {2:6.2f}, index: {3}".
|
||||
format(lat, lon, alt, index))
|
||||
|
||||
# does it reach the position in X seconds?
|
||||
# does it reach the position in 'timeout' seconds?
|
||||
loop_freq = 10 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
reached = False
|
||||
for count in xrange(timeout):
|
||||
for i in xrange(timeout * loop_freq):
|
||||
# use MC radius by default
|
||||
# FIXME: also check MAV_TYPE from system status, otherwise pure fixed-wing won't work
|
||||
xy_radius = self.mc_rad
|
||||
@@ -232,70 +346,81 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
if self.is_at_position(lat, lon, alt, xy_radius, z_radius):
|
||||
reached = True
|
||||
rospy.loginfo(
|
||||
"position reached | index: {0}, count: {1}, pos_d: {2}, alt_d: {3}".
|
||||
format(index, count, self.last_pos_d, self.last_alt_d))
|
||||
"position reached | pos_d: {0}, alt_d: {1}, index: {2} | seconds: {3} of {4}".
|
||||
format(self.last_pos_d, self.last_alt_d, index, i /
|
||||
loop_freq, timeout))
|
||||
break
|
||||
|
||||
self.rate.sleep()
|
||||
|
||||
vtol_state_string = "VTOL undefined"
|
||||
|
||||
if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_MC):
|
||||
vtol_state_string = "VTOL MC"
|
||||
if (self.extended_state.vtol_state == ExtendedState.VTOL_STATE_FW):
|
||||
vtol_state_string = "VTOL FW"
|
||||
if (self.extended_state.vtol_state ==
|
||||
ExtendedState.VTOL_STATE_TRANSITION_TO_MC):
|
||||
vtol_state_string = "VTOL FW->MC"
|
||||
if (self.extended_state.vtol_state ==
|
||||
ExtendedState.VTOL_STATE_TRANSITION_TO_FW):
|
||||
vtol_state_string = "VTOL MC->FW"
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(reached, (
|
||||
"({0}) took too long to get to position | lat: {1:13.9f}, lon: {2:13.9f}, alt: {3:6.2f}, xy off: {4}, z off: {5}, timeout: {6}, index: {7}, pos_d: {8}, alt_d: {9}, VTOL state: {10}".
|
||||
"({0}) took too long to get to position | lat: {1:13.9f}, lon: {2:13.9f}, alt: {3:6.2f}, xy off: {4}, z off: {5}, pos_d: {6}, alt_d: {7}, VTOL state: {8}, index: {9} | timeout(seconds): {10}".
|
||||
format(self.mission_name, lat, lon, alt, xy_radius, z_radius,
|
||||
timeout, index, self.last_pos_d, self.last_alt_d,
|
||||
vtol_state_string)))
|
||||
self.last_pos_d, self.last_alt_d,
|
||||
self.VTOL_STATES.get(self.extended_state.vtol_state), index,
|
||||
timeout)))
|
||||
|
||||
def wait_until_ready(self):
|
||||
"""FIXME: hack to wait for simulation to be ready"""
|
||||
rospy.loginfo("waiting for global position")
|
||||
while not self.has_global_pos or math.isnan(
|
||||
self.altitude.amsl) or math.isnan(self.altitude.relative):
|
||||
self.rate.sleep()
|
||||
|
||||
def wait_on_landing(self, timeout, index):
|
||||
"""Wait for landed state"""
|
||||
rospy.loginfo("waiting for landing | timeout: {0}, index: {1}".format(
|
||||
timeout, index))
|
||||
landed = False
|
||||
for count in xrange(timeout):
|
||||
if self.extended_state.landed_state == ExtendedState.LANDED_STATE_ON_GROUND:
|
||||
rospy.loginfo(
|
||||
"landed | index: {0}, count: {1}".format(index, count))
|
||||
landed = True
|
||||
def wait_for_topics(self, timeout):
|
||||
"""wait for simulation to be ready, make sure we're getting topic info
|
||||
from all topics by checking dictionary of flag values set in callbacks,
|
||||
timeout(int): seconds"""
|
||||
rospy.loginfo("waiting for simulation topics to be ready")
|
||||
loop_freq = 1 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
simulation_ready = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if all(value for value in self.sub_topics_ready.values()):
|
||||
simulation_ready = True
|
||||
rospy.loginfo("simulation topics ready | seconds: {0} of {1}".
|
||||
format(i / loop_freq, timeout))
|
||||
break
|
||||
|
||||
self.rate.sleep()
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(landed, (
|
||||
"({0}) landing not detected after landing WP | timeout: {1}, index: {2}".
|
||||
format(self.mission_name, timeout, index)))
|
||||
self.assertTrue(simulation_ready, (
|
||||
"failed to hear from all subscribed simulation topics | topic ready flags: {0} | timeout(seconds): {1}".
|
||||
format(self.sub_topics_ready, timeout)))
|
||||
|
||||
def wait_on_landed_state(self, desired_landed_state, timeout, index):
|
||||
rospy.loginfo(
|
||||
"waiting for landed state | state: {0}, index: {1}".format(
|
||||
self.LAND_STATES.get(desired_landed_state), index))
|
||||
loop_freq = 10 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
landed_state_confirmed = False
|
||||
for i in xrange(timeout * loop_freq):
|
||||
if self.extended_state.landed_state == desired_landed_state:
|
||||
landed_state_confirmed = True
|
||||
rospy.loginfo(
|
||||
"landed state confirmed | state: {0}, index{1}".format(
|
||||
self.LAND_STATES.get(desired_landed_state), index))
|
||||
break
|
||||
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(landed_state_confirmed, (
|
||||
"({0}) landed state not detected | desired: {1}, current: {2} | index: {3}, timeout(seconds): {4}".
|
||||
format(self.mission_name,
|
||||
self.LAND_STATES.get(desired_landed_state),
|
||||
self.LAND_STATES.get(self.extended_state.landed_state),
|
||||
index, timeout)))
|
||||
|
||||
def wait_on_transition(self, transition, timeout, index):
|
||||
"""Wait for VTOL transition"""
|
||||
|
||||
"""Wait for VTOL transition, timeout(int): seconds"""
|
||||
rospy.loginfo(
|
||||
"waiting for VTOL transition | transition: {0}, timeout: {1}, index: {2}".
|
||||
format(transition, timeout, index))
|
||||
"waiting for VTOL transition | transition: {0}, index: {1}".format(
|
||||
self.VTOL_STATES.get(transition), index))
|
||||
loop_freq = 10 # Hz
|
||||
rate = rospy.Rate(loop_freq)
|
||||
transitioned = False
|
||||
for count in xrange(timeout):
|
||||
for i in xrange(timeout * loop_freq):
|
||||
# transition to MC
|
||||
if (transition == ExtendedState.VTOL_STATE_MC and
|
||||
self.extended_state.vtol_state ==
|
||||
ExtendedState.VTOL_STATE_MC):
|
||||
rospy.loginfo("transitioned | index: {0}, count: {1}".format(
|
||||
index, count))
|
||||
rospy.loginfo(
|
||||
"transitioned | index: {0} | seconds: {1} of {2}".format(
|
||||
index, i / loop_freq, timeout))
|
||||
transitioned = True
|
||||
break
|
||||
|
||||
@@ -303,16 +428,17 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
if (transition == ExtendedState.VTOL_STATE_FW and
|
||||
self.extended_state.vtol_state ==
|
||||
ExtendedState.VTOL_STATE_FW):
|
||||
rospy.loginfo("transitioned | index: {0}, count: {1}".format(
|
||||
index, count))
|
||||
rospy.loginfo(
|
||||
"transitioned | index: {0} | seconds: {1} of {2}".format(
|
||||
index, i / loop_freq, timeout))
|
||||
transitioned = True
|
||||
break
|
||||
|
||||
self.rate.sleep()
|
||||
rate.sleep()
|
||||
|
||||
self.assertTrue(transitioned, (
|
||||
"({0}) transition not detected | timeout: {1}, index: {2}".format(
|
||||
self.mission_name, timeout, index)))
|
||||
"({0}) transition not detected | index: {1} | timeout(seconds): {2}, ".
|
||||
format(self.mission_name, index, timeout)))
|
||||
|
||||
#
|
||||
# Test method
|
||||
@@ -346,7 +472,8 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
else:
|
||||
raise IOError('unknown mission file extension', mission_ext)
|
||||
|
||||
self.wait_until_ready()
|
||||
# delay starting the mission
|
||||
self.wait_for_topics(30)
|
||||
|
||||
rospy.loginfo("send mission")
|
||||
result = False
|
||||
@@ -359,19 +486,13 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
result,
|
||||
"({0}) mission could not be transfered".format(self.mission_name))
|
||||
|
||||
rospy.loginfo("set mission mode and arm")
|
||||
while self.state.mode != "AUTO.MISSION" or not self.state.armed:
|
||||
if self.state.mode != "AUTO.MISSION":
|
||||
try:
|
||||
res = self.set_mode_srv(0, 'AUTO.MISSION')
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
if not self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(True)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.sleep(2)
|
||||
# make sure the simulation is ready to start the mission
|
||||
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND, 10, -1)
|
||||
|
||||
rospy.loginfo("seting mission mode")
|
||||
self.set_mode("AUTO.MISSION", 5)
|
||||
rospy.loginfo("arming")
|
||||
self.set_arm(True, 5)
|
||||
|
||||
rospy.loginfo("run mission")
|
||||
for index, waypoint in enumerate(wps):
|
||||
@@ -381,7 +502,7 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
if waypoint.frame == Waypoint.FRAME_GLOBAL_REL_ALT:
|
||||
alt += self.altitude.amsl - self.altitude.relative
|
||||
|
||||
self.reach_position(waypoint.x_lat, waypoint.y_long, alt, 600,
|
||||
self.reach_position(waypoint.x_lat, waypoint.y_long, alt, 60,
|
||||
index)
|
||||
|
||||
# check if VTOL transition happens if applicable
|
||||
@@ -394,23 +515,21 @@ class MavrosMissionTest(unittest.TestCase):
|
||||
if waypoint.command == 85: # VTOL takeoff implies transition to MC
|
||||
transition = ExtendedState.VTOL_STATE_MC
|
||||
|
||||
self.wait_on_transition(transition, 600, index)
|
||||
self.wait_on_transition(transition, 60, index)
|
||||
|
||||
# after reaching position, wait for landing detection if applicable
|
||||
if waypoint.command == 85 or waypoint.command == 21:
|
||||
self.wait_on_landing(600, index)
|
||||
self.wait_on_landed_state(ExtendedState.LANDED_STATE_ON_GROUND,
|
||||
60, index)
|
||||
|
||||
if self.state.armed:
|
||||
try:
|
||||
self.set_arming_srv(False)
|
||||
except rospy.ServiceException as e:
|
||||
rospy.logerr(e)
|
||||
rospy.loginfo("disarming")
|
||||
self.set_arm(False, 5)
|
||||
|
||||
rospy.loginfo("mission done, calculating performance metrics")
|
||||
last_log = get_last_log()
|
||||
rospy.loginfo("log file {0}".format(last_log))
|
||||
data = px4tools.ulog.read_ulog(last_log).concat(dt=0.1)
|
||||
data = px4tools.ulog.compute_data(data)
|
||||
data = px4tools.read_ulog(last_log).concat(dt=0.1)
|
||||
data = px4tools.compute_data(data)
|
||||
res = px4tools.estimator_analysis(data, False)
|
||||
|
||||
# enforce performance
|
||||
|
||||
Reference in New Issue
Block a user