Files
PX4-Autopilot/test/mavsdk_tests/process_helper.py
T
Julian Oes 76750fc8a6 mavsdk_tests: collect buffered up stdout output
Somehow only subprocess.stdout.readline() works at a time. In order not
to miss out on some of the stdout output, we need to collect it all at
the end. Also, we can stop using readline() for processes that have quit
already.
2020-05-29 11:46:14 +02:00

265 lines
8.6 KiB
Python

#!/usr/bin/env python3
import queue
import time
import os
import atexit
import subprocess
import threading
import errno
from typing import Any, Dict, List, TextIO, Optional
class Runner:
def __init__(self,
log_dir: str,
model: str,
case: str,
verbose: bool):
self.name = ""
self.cmd = ""
self.cwd = ""
self.args: List[str]
self.env: Dict[str, str]
self.model = model
self.case = case
self.log_filename = ""
self.log_fd: TextIO
self.verbose = verbose
self.output_queue: queue.Queue[str] = queue.Queue()
self.start_time = time.time()
self.log_dir = log_dir
self.log_filename = ""
self.stop_thread: Any[threading.Event] = None
def set_log_filename(self, log_filename: str) -> None:
self.log_filename = log_filename
def get_log_filename(self) -> str:
return self.log_filename
def start(self) -> None:
if self.verbose:
print("Running: {}".format(" ".join([self.cmd] + self.args)))
atexit.register(self.stop)
if self.verbose:
print("Logging to {}".format(self.log_filename))
self.log_fd = open(self.log_filename, 'w')
self.process = subprocess.Popen(
[self.cmd] + self.args,
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
self.stop_thread = threading.Event()
self.thread = threading.Thread(target=self.process_output)
self.thread.start()
def process_output(self) -> None:
assert self.process.stdout is not None
while True:
line = self.process.stdout.readline()
if not line and \
(self.stop_thread.is_set() or self.poll is not None):
break
if not line or line == "\n":
continue
self.output_queue.put(line)
self.log_fd.write(line)
self.log_fd.flush()
def poll(self) -> Optional[int]:
return self.process.poll()
def wait(self, timeout_min: float) -> Optional[int]:
try:
return self.process.wait(timeout=timeout_min*60)
except subprocess.TimeoutExpired:
print("Timeout of {} min{} reached, stopping...".
format(timeout_min, "s" if timeout_min > 1 else ""))
self.stop()
print("stopped.")
return errno.ETIMEDOUT
def get_output_line(self) -> Optional[str]:
while True:
try:
return self.output_queue.get(block=True, timeout=0.1)
except queue.Empty:
return None
def stop(self) -> int:
atexit.unregister(self.stop)
if not self.stop_thread:
return 0
returncode = self.process.poll()
if returncode is None:
if self.verbose:
print("Terminating {}".format(self.cmd))
self.process.terminate()
try:
returncode = self.process.wait(timeout=1)
except subprocess.TimeoutExpired:
pass
if returncode is None:
if self.verbose:
print("Killing {}".format(self.cmd))
self.process.kill()
returncode = self.process.poll()
if self.verbose:
print("{} exited with {}".format(
self.cmd, self.process.returncode))
self.stop_thread.set()
self.thread.join()
self.log_fd.close()
return self.process.returncode
def time_elapsed_s(self) -> float:
return time.time() - self.start_time
def add_to_env_if_set(self, var: str) -> None:
if var in os.environ:
self.env[var] = os.environ[var]
class Px4Runner(Runner):
def __init__(self, workspace_dir: str, log_dir: str,
model: str, case: str, speed_factor: float,
debugger: str, verbose: bool):
super().__init__(log_dir, model, case, verbose)
self.name = "px4"
self.cmd = workspace_dir + "/build/px4_sitl_default/bin/px4"
self.cwd = workspace_dir + "/build/px4_sitl_default/tmp/rootfs"
self.args = [
workspace_dir + "/ROMFS/px4fmu_common",
"-s",
"etc/init.d-posix/rcS",
"-t",
workspace_dir + "/test_data",
"-d"
]
self.env = {"PATH": str(os.environ['PATH']),
"PX4_SIM_MODEL": self.model,
"PX4_SIM_SPEED_FACTOR": str(speed_factor)}
self.debugger = debugger
if not self.debugger:
pass
elif self.debugger == "valgrind":
self.args = ["--track-origins=yes", "--leak-check=full", "-v",
self.cmd] + self.args
self.cmd = "valgrind"
elif self.debugger == "callgrind":
self.args = ["--tool=callgrind", "-v", self.cmd] + self.args
self.cmd = "valgrind"
elif self.debugger == "gdb":
self.args = ["--args", self.cmd] + self.args
self.cmd = "gdb"
else:
print("Using custom debugger " + self.debugger)
self.args = [self.cmd] + self.args
self.cmd = self.debugger
class GzserverRunner(Runner):
def __init__(self,
workspace_dir: str,
log_dir: str,
model: str,
case: str,
speed_factor: float,
verbose: bool):
super().__init__(log_dir, model, case, verbose)
self.name = "gzserver"
self.cwd = workspace_dir
self.env = {"PATH": os.environ['PATH'],
"HOME": os.environ['HOME'],
"GAZEBO_PLUGIN_PATH":
workspace_dir + "/build/px4_sitl_default/build_gazebo",
"GAZEBO_MODEL_PATH":
workspace_dir + "/Tools/sitl_gazebo/models",
"PX4_SIM_SPEED_FACTOR": str(speed_factor)}
self.add_to_env_if_set("DISPLAY")
self.add_to_env_if_set("PX4_HOME_LAT")
self.add_to_env_if_set("PX4_HOME_LON")
self.add_to_env_if_set("PX4_HOME_ALT")
self.cmd = "gzserver"
self.args = ["--verbose",
workspace_dir + "/Tools/sitl_gazebo/worlds/" +
"empty.world"]
class GzmodelspawnRunner(Runner):
def __init__(self,
workspace_dir: str,
log_dir: str,
model: str,
case: str,
verbose: bool):
super().__init__(log_dir, model, case, verbose)
self.name = "gzmodelspawn"
self.cwd = workspace_dir
self.env = {"PATH": os.environ['PATH'],
"HOME": os.environ['HOME'],
"GAZEBO_PLUGIN_PATH":
workspace_dir + "/build/px4_sitl_default/build_gazebo",
"GAZEBO_MODEL_PATH":
workspace_dir + "/Tools/sitl_gazebo/models"}
self.add_to_env_if_set("DISPLAY")
self.cmd = "gz"
self.args = ["model", "--spawn-file", workspace_dir +
"/Tools/sitl_gazebo/models/" +
self.model + "/" + self.model + ".sdf",
"--model-name", self.model,
"-x", "1.01", "-y", "0.98", "-z", "0.83"]
class GzclientRunner(Runner):
def __init__(self,
workspace_dir: str,
log_dir: str,
model: str,
case: str,
verbose: bool):
super().__init__(log_dir, model, case, verbose)
self.name = "gzclient"
self.cwd = workspace_dir
self.env = {"PATH": os.environ['PATH'],
"HOME": os.environ['HOME'],
"GAZEBO_MODEL_PATH":
workspace_dir + "/Tools/sitl_gazebo/models"}
self.add_to_env_if_set("DISPLAY")
self.cmd = "gzclient"
self.args = ["--verbose"]
class TestRunner(Runner):
def __init__(self,
workspace_dir: str,
log_dir: str,
model: str,
case: str,
mavlink_connection: str,
verbose: bool):
super().__init__(log_dir, model, case, verbose)
self.name = "mavsdk_tests"
self.cwd = workspace_dir
self.env = {"PATH": os.environ['PATH']}
self.cmd = workspace_dir + \
"/build/px4_sitl_default/mavsdk_tests/mavsdk_tests"
self.args = ["--url", mavlink_connection, case]