From 75f1678047e2beb4ec4e1cf7fd383685175d3694 Mon Sep 17 00:00:00 2001 From: Andreas Antener Date: Mon, 23 Feb 2015 18:18:08 +0100 Subject: [PATCH] - updated position control test - added flight path assertion helper --- .../demo_tests/flight_path_assertion.py | 122 ++++++++++++++++++ .../demo_tests/offboard_posctl_test.py | 85 +++++++----- 2 files changed, 177 insertions(+), 30 deletions(-) create mode 100644 integrationtests/demo_tests/flight_path_assertion.py diff --git a/integrationtests/demo_tests/flight_path_assertion.py b/integrationtests/demo_tests/flight_path_assertion.py new file mode 100644 index 0000000000..fb78f89298 --- /dev/null +++ b/integrationtests/demo_tests/flight_path_assertion.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +import sys +import rospy +import threading + +from px4.msg import vehicle_local_position +from gazebo_msgs.srv import SpawnModel +from gazebo_msgs.srv import SetModelState +from geometry_msgs.msg import Pose +from geometry_msgs.msg import Twist + +from numpy import linalg +import numpy as np +import math + +# +# Helper to test if vehicle stays in expected flight path. +# +class FlightPathAssertion(threading.Thread): + + def __init__(self, positions, tunnelRadius = 1, yawOffset = 0.2): + threading.Thread.__init__(self) + rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback) + self.spawn = rospy.ServiceProxy('gazebo/spawn_sdf_model', SpawnModel) + self.setModelState = rospy.ServiceProxy('gazebo/set_model_state', SetModelState) + self.positions = positions + self.tunnelRadius = tunnelRadius + self.yawOffset = yawOffset + self.hasPos = False + self.shouldStop = False + self.center = positions[0] + self.endOfSegment = False + + def position_callback(self, data): + self.hasPos = True + self.localPosition = data + + def spawn_indicator(self): + xml = "true0.7%f1 0 0 0.51 0 0 0.5" % self.tunnelRadius + self.spawn("indicator", xml, "", Pose(), "") + + def position_indicator(self): + state = SetModelState() + state.model_name = "indicator" + pose = Pose() + pose.position.x = self.center[0] + pose.position.y = (-1) * self.center[1] + pose.position.z = (-1) * self.center[2] + state.pose = pose + state.twist = Twist() + state.reference_frame = "" + self.setModelState(state) + + def distance_to_line(self, a, b, pos): + v = b - a + w = pos - a + + c1 = np.dot(w, v) + if c1 <= 0: # before a + self.center = a + return linalg.norm(pos - a) + + c2 = np.dot(v, v) + if c2 <= c1: # after b + self.center = b + self.endOfSegment = True + return linalg.norm(pos - b) + + x = c1 / c2 + l = a + x * v + self.center = l + return linalg.norm(pos - l) + + def stop(self): + self.shouldStop = True + + def run(self): + rate = rospy.Rate(10) # 10hz + self.spawn_indicator() + + current = 0 + + while not self.shouldStop: + if (self.hasPos): + # calculate distance to line segment between first two points + # if distances > tunnelRadius + # exit with error + # advance current pos if not on the line anymore or distance to next point < tunnelRadius + # exit if current pos is now the last position + + self.position_indicator() + + pos = np.array((self.localPosition.x, + self.localPosition.y, + self.localPosition.z)) + aPos = np.array((self.positions[current][0], + self.positions[current][1], + self.positions[current][2])) + bPos = np.array((self.positions[current + 1][0], + self.positions[current + 1][1], + self.positions[current + 1][2])) + + dist = self.distance_to_line(aPos, bPos, pos) + bDist = linalg.norm(pos - bPos) + + rospy.loginfo("distance to line: %f, distance to end: %f" % (dist, bDist)) + + if (dist > self.tunnelRadius): + rospy.logerr("left tunnel at position (%f, %f, %f)" % (self.localPosition.x, self.localPosition.y, self.localPosition.z)) + # FIXME: assertion + break + + if (self.endOfSegment or bDist < self.tunnelRadius): + rospy.loginfo("next segment") + self.endOfSegment = False + current = current + 1 + + if (current == len(self.positions) - 1): + rospy.loginfo("no more positions") + break + + rate.sleep() diff --git a/integrationtests/demo_tests/offboard_posctl_test.py b/integrationtests/demo_tests/offboard_posctl_test.py index 0d56082459..af8d4b821b 100755 --- a/integrationtests/demo_tests/offboard_posctl_test.py +++ b/integrationtests/demo_tests/offboard_posctl_test.py @@ -5,6 +5,9 @@ import sys import unittest import rospy +from numpy import linalg +import numpy as np + from px4.msg import vehicle_local_position from px4.msg import vehicle_control_mode from px4.msg import actuator_armed @@ -14,10 +17,18 @@ from sensor_msgs.msg import Joy from std_msgs.msg import Header from manual_input import ManualInput +from flight_path_assertion import FlightPathAssertion class OffboardPosctlTest(unittest.TestCase): + def setUp(self): + rospy.init_node('test_node', anonymous=True) + rospy.Subscriber('px4_multicopter/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) + rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback) + self.pubSpt = rospy.Publisher('px4_multicopter/position_setpoint_triplet', position_setpoint_triplet, queue_size=10) + self.rate = rospy.Rate(10) # 10hz + # # General callback functions used in tests # @@ -33,19 +44,37 @@ class OffboardPosctlTest(unittest.TestCase): # Helper methods # def is_at_position(self, x, y, z, offset): - rospy.loginfo("current position %f, %f, %f" % (self.localPosition.x, self.localPosition.y, self.localPosition.z)) - return self.localPosition.z > (z - offset) and self.localPosition.z < (z + offset) + rospy.logdebug("current position %f, %f, %f" % (self.localPosition.x, self.localPosition.y, self.localPosition.z)) + desired = np.array((x, y, z)) + pos = np.array((self.localPosition.x, self.localPosition.y, self.localPosition.z)) + return linalg.norm(desired - pos) < offset + + def reach_position(self, x, y, z, timeout): + # set a position setpoint + pos = position_setpoint() + pos.valid = True + pos.x = x + pos.y = y + pos.z = z + pos.position_valid = True + stp = position_setpoint_triplet() + stp.current = pos + self.pubSpt.publish(stp) + + # does it reach the position in X seconds? + count = 0 + while(count < timeout): + if(self.is_at_position(pos.x, pos.y, pos.z, 0.5)): + break + count = count + 1 + self.rate.sleep() + + self.assertTrue(count < timeout, "took too long to get to position") # # Test offboard POSCTL # def test_posctl(self): - rospy.init_node('test_node', anonymous=True) - rospy.Subscriber('px4_multicopter/vehicle_control_mode', vehicle_control_mode, self.vehicle_control_mode_callback) - rospy.Subscriber("px4_multicopter/vehicle_local_position", vehicle_local_position, self.position_callback) - pubSpt = rospy.Publisher('px4_multicopter/position_setpoint_triplet', position_setpoint_triplet, queue_size=10) - rate = rospy.Rate(10) # 10hz - manIn = ManualInput() # arm and go into offboard @@ -55,42 +84,38 @@ class OffboardPosctlTest(unittest.TestCase): self.assertTrue(self.controlMode.flag_control_offboard_enabled, "flag_control_offboard_enabled is not set") self.assertTrue(self.controlMode.flag_control_position_enabled, "flag_control_position_enabled is not set") - # set a position setpoint - pos = position_setpoint() - pos.valid = True - pos.x = 2 - pos.z = -2 - pos.y = 2 - pos.position_valid = True - stp = position_setpoint_triplet() - stp.current = pos - pubSpt.publish(stp) + # prepare flight path assertion + fpa = FlightPathAssertion( + ( + (0,0,0), + (2,2,-2), + (2,-2,-2), + (-2,-2,-2), + (2,2,-2), + ), 0.5, 0) + fpa.start() - # does it reach the position in X seconds? - count = 0 - timeout = 120 - while(count < timeout): - if(self.is_at_position(pos.x, pos.y, pos.z, 0.5)): - break - count = count + 1 - rate.sleep() - - self.assertTrue(count < timeout, "took too long to get to position") + self.reach_position(2, 2, -2, 120) + self.reach_position(2, -2, -2, 120) + self.reach_position(-2, -2, -2, 120) + self.reach_position(2, 2, -2, 120) # does it hold the position for Y seconds? positionHeld = True count = 0 timeout = 50 while(count < timeout): - if(not self.is_at_position(pos.x, pos.y, pos.z, 0.5)): + if(not self.is_at_position(2, 2, -2, 0.5)): positionHeld = False break count = count + 1 - rate.sleep() + self.rate.sleep() self.assertTrue(count == timeout, "position could not be held") + fpa.stop() if __name__ == '__main__': import rostest rostest.rosrun(PKG, 'posctl_test', OffboardPosctlTest) + #unittest.main()