mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
Tools/mavlink_shell.py: implement a simple shell with history
This commit is contained in:
parent
aefa319fc4
commit
c22e7ed5c9
@ -9,6 +9,7 @@ Open a shell over MAVLink.
|
||||
|
||||
from __future__ import print_function
|
||||
import sys, select
|
||||
import termios
|
||||
|
||||
try:
|
||||
from pymavlink import mavutil
|
||||
@ -118,24 +119,72 @@ def main():
|
||||
|
||||
mav_serialport.write('\n') # make sure the shell is started
|
||||
|
||||
# setup the console, so we can read one char at a time
|
||||
fd_in = sys.stdin.fileno()
|
||||
old_attr = termios.tcgetattr(fd_in)
|
||||
new_attr = termios.tcgetattr(fd_in)
|
||||
new_attr[3] = new_attr[3] & ~termios.ECHO # lflags
|
||||
new_attr[3] = new_attr[3] & ~termios.ICANON
|
||||
|
||||
try:
|
||||
termios.tcsetattr(fd_in, termios.TCSANOW, new_attr)
|
||||
cur_line = ''
|
||||
command_history = []
|
||||
cur_history_index = 0
|
||||
|
||||
def erase_last_n_chars(N):
|
||||
if N == 0: return
|
||||
CURSOR_BACK_N = '\x1b['+str(N)+'D'
|
||||
ERASE_END_LINE = '\x1b[K'
|
||||
sys.stdout.write(CURSOR_BACK_N + ERASE_END_LINE)
|
||||
|
||||
while True:
|
||||
i, o, e = select.select([sys.stdin], [], [], 0.001)
|
||||
if (i):
|
||||
data = sys.stdin.readline()
|
||||
while True:
|
||||
i, o, e = select.select([sys.stdin], [], [], 0)
|
||||
if not i: break
|
||||
ch = sys.stdin.read(1)
|
||||
|
||||
# Erase stdin output (otherwise we have it twice)
|
||||
# FIXME: this assumes a prompt of 5 chars, and does not work in all cases
|
||||
CURSOR_UP_ONE = '\x1b[1A'
|
||||
CURSOR_RIGHT = '\x1b[5C'
|
||||
ERASE_LINE = '\x1b[2K'
|
||||
ERASE_END_LINE = '\x1b[K'
|
||||
sys.stdout.write(CURSOR_UP_ONE + CURSOR_RIGHT + ERASE_END_LINE)
|
||||
# provide a simple shell with command history
|
||||
if ch == '\n':
|
||||
if len(cur_line) > 0:
|
||||
# erase current text (mavlink shell will echo it as well)
|
||||
erase_last_n_chars(len(cur_line))
|
||||
|
||||
# TODO: add a command history?
|
||||
# add to history
|
||||
if len(command_history) == 0 or command_history[-1] != cur_line:
|
||||
command_history.append(cur_line)
|
||||
if len(command_history) > 50:
|
||||
del command_history[0]
|
||||
cur_history_index = len(command_history)
|
||||
mav_serialport.write(cur_line+'\n')
|
||||
cur_line = ''
|
||||
elif ord(ch) == 127: # backslash
|
||||
if len(cur_line) > 0:
|
||||
erase_last_n_chars(1)
|
||||
cur_line = cur_line[:-1]
|
||||
sys.stdout.write(ch)
|
||||
elif ord(ch) == 033:
|
||||
ch = sys.stdin.read(1) # skip one
|
||||
ch = sys.stdin.read(1)
|
||||
if ch == 'A': # arrow up
|
||||
if cur_history_index > 0:
|
||||
cur_history_index -= 1
|
||||
elif ch == 'B': # arrow down
|
||||
if cur_history_index < len(command_history):
|
||||
cur_history_index += 1
|
||||
# TODO: else: support line editing
|
||||
|
||||
#sys.stdout.write(data)
|
||||
mav_serialport.write(data)
|
||||
erase_last_n_chars(len(cur_line))
|
||||
if cur_history_index == len(command_history):
|
||||
cur_line = ''
|
||||
else:
|
||||
cur_line = command_history[cur_history_index]
|
||||
sys.stdout.write(cur_line)
|
||||
|
||||
elif ord(ch) > 3:
|
||||
cur_line += ch
|
||||
sys.stdout.write(ch)
|
||||
sys.stdout.flush()
|
||||
|
||||
data = mav_serialport.read(4096)
|
||||
if data and len(data) > 0:
|
||||
@ -148,6 +197,9 @@ def main():
|
||||
except KeyboardInterrupt:
|
||||
mav_serialport.close()
|
||||
|
||||
finally:
|
||||
termios.tcsetattr(fd_in, termios.TCSADRAIN, old_attr)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user