mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-06-26 03:30:35 +08:00
wip: squashed patch
This commit is contained in:
Executable
+97
@@ -0,0 +1,97 @@
|
||||
#!/bin/bash
|
||||
# Flash PX4 to a device running AOS in the local network
|
||||
if [ "$1" == "-h" ] || [ "$1" == "--help" ] || [ $# -lt 2 ]; then
|
||||
echo "Usage: $0 -f <firmware.px4|.elf> [-c <configuration_dir>] -d <IP/Device> [-u <user>] [-p <ssh_port>]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
ssh_port=22
|
||||
ssh_user=root
|
||||
|
||||
while getopts ":f:c:d:p:u:" opt; do
|
||||
case ${opt} in
|
||||
f )
|
||||
if [ -n "$OPTARG" ]; then
|
||||
firmware_file="$OPTARG"
|
||||
else
|
||||
echo "ERROR: -f requires a non-empty option argument."
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
c )
|
||||
if [ -f "$OPTARG/rc.autostart" ]; then
|
||||
config_dir="$OPTARG"
|
||||
else
|
||||
echo "ERROR: -c configuration directory is empty or does not contain a valid rc.autostart"
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
d )
|
||||
if [ "$OPTARG" ]; then
|
||||
device="$OPTARG"
|
||||
else
|
||||
echo "ERROR: -d requires a non-empty option argument."
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
p )
|
||||
if [[ "$OPTARG" =~ ^[0-9]+$ ]]; then
|
||||
ssh_port="$OPTARG"
|
||||
else
|
||||
echo "ERROR: -p ssh_port must be a number."
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
u )
|
||||
if [ "$OPTARG" ]; then
|
||||
ssh_user="$OPTARG"
|
||||
else
|
||||
echo "ERROR: -u requires a non-empty option argument."
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
target_dir=/shared_container_dir/fmu
|
||||
|
||||
# create update-dev.tar
|
||||
target_file_name="update-dev.tar"
|
||||
tmp_dir="$(mktemp -d)"
|
||||
config_path=""
|
||||
firmware_path=""
|
||||
|
||||
if [ -d "$config_dir" ]; then
|
||||
cp -r "$config_dir" "$tmp_dir/config"
|
||||
config_path=config
|
||||
fi
|
||||
|
||||
if [ -f "$firmware_file" ]; then
|
||||
extension="${firmware_file##*.}"
|
||||
cp "$firmware_file" "$tmp_dir/firmware.$extension"
|
||||
if [ "$extension" == "elf" ]; then
|
||||
# ensure the file is stripped to reduce file size
|
||||
arm-none-eabi-strip "$tmp_dir/firmware.$extension"
|
||||
fi
|
||||
firmware_path="firmware.$extension"
|
||||
fi
|
||||
|
||||
if [ -z "$device" ]; then
|
||||
echo "Error: missing device"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
pushd "$tmp_dir" &>/dev/null
|
||||
|
||||
if [ -z $firmware_path ] && [ -z $config_path ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
tar -C "$tmp_dir" --sort=name --owner=root:0 --group=root:0 --mtime='2019-01-01 00:00:00' -cvf $target_file_name $firmware_path $config_path
|
||||
scp -P $ssh_port "$target_file_name" $ssh_user@"$device":$target_dir
|
||||
popd &>/dev/null
|
||||
rm -rf "$tmp_dir"
|
||||
|
||||
# grab status output
|
||||
cmd="tail --follow=name $target_dir/update_status 2>/dev/null || true"
|
||||
ssh -t -p $ssh_port $ssh_user@$device "$cmd"
|
||||
Executable
+36
@@ -0,0 +1,36 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
DIR="$(dirname $(readlink -f $0))"
|
||||
PX4_BINARY_FILE="$1"
|
||||
DEFAULT_AUTOPILOT_HOST=10.41.0.1
|
||||
DEFAULT_AUTOPILOT_PORT=33333
|
||||
DEFAULT_AUTOPILOT_USER=auterion
|
||||
|
||||
for i in "$@"
|
||||
do
|
||||
case $i in
|
||||
--default-ip=*)
|
||||
DEFAULT_AUTOPILOT_HOST="${i#*=}"
|
||||
;;
|
||||
--default-port=*)
|
||||
DEFAULT_AUTOPILOT_PORT="${i#*=}"
|
||||
;;
|
||||
--default-user=*)
|
||||
DEFAULT_AUTOPILOT_USER="${i#*=}"
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
# allow these to be overridden
|
||||
[ -z "$AUTOPILOT_HOST" ] && AUTOPILOT_HOST=$DEFAULT_AUTOPILOT_HOST
|
||||
[ -z "$AUTOPILOT_PORT" ] && AUTOPILOT_PORT=$DEFAULT_AUTOPILOT_PORT
|
||||
[ -z "$AUTOPILOT_USER" ] && AUTOPILOT_USER=$DEFAULT_AUTOPILOT_USER
|
||||
|
||||
echo "Uploading to $AUTOPILOT_HOST..."
|
||||
|
||||
"$DIR"/remote_update_fmu.sh -f "$PX4_BINARY_FILE" -d "$AUTOPILOT_HOST" -p $AUTOPILOT_PORT -u $AUTOPILOT_USER
|
||||
|
||||
exit 0
|
||||
@@ -6,10 +6,17 @@
|
||||
# For example gazebo can be run like this:
|
||||
#./Tools/gazebo_sitl_multiple_run.sh -n 10 -m iris
|
||||
|
||||
SUPPORTED_MODELS=("iris" "plane" "standard_vtol" "rover" "r1_rover" "typhoon_h480", "shieldai_nova2", "skydio_x2d")
|
||||
|
||||
function cleanup() {
|
||||
pkill -x px4
|
||||
pkill gzclient
|
||||
pkill gzserver
|
||||
|
||||
if [[ -n "$HEADLESS" ]]; then
|
||||
exit
|
||||
else
|
||||
pkill gzclient
|
||||
fi
|
||||
}
|
||||
|
||||
function spawn_model() {
|
||||
@@ -20,10 +27,9 @@ function spawn_model() {
|
||||
X=${X:=0.0}
|
||||
Y=${Y:=$((3*${N}))}
|
||||
|
||||
SUPPORTED_MODELS=("iris" "plane" "standard_vtol" "rover" "r1_rover" "typhoon_h480")
|
||||
if [[ " ${SUPPORTED_MODELS[*]} " != *"$MODEL"* ]];
|
||||
then
|
||||
echo "ERROR: Currently only vehicle model $MODEL is not supported!"
|
||||
echo "ERROR: Currently vehicle model '$MODEL' is not supported!"
|
||||
echo " Supported Models: [${SUPPORTED_MODELS[@]}]"
|
||||
trap "cleanup" SIGINT SIGTERM EXIT
|
||||
exit 1
|
||||
@@ -34,8 +40,19 @@ function spawn_model() {
|
||||
|
||||
pushd "$working_dir" &>/dev/null
|
||||
echo "starting instance $N in $(pwd)"
|
||||
|
||||
if [[ -n "${PX4_VIDEO_HOST_IP}" ]]; then
|
||||
export PX4_VIDEO_HOST_IP=${PX4_VIDEO_HOST_IP%.*}.$((7+$N))
|
||||
echo "PX4_VIDEO_HOST_IP '$PX4_VIDEO_HOST_IP'"
|
||||
fi
|
||||
|
||||
if [[ -n "${PX4_SIM_REMOTE_HOST}" ]]; then
|
||||
export PX4_SIM_REMOTE_HOST=${PX4_SIM_REMOTE_HOST%.*}.$((7+$N))
|
||||
echo "PX4_SIM_REMOTE_HOST '$PX4_SIM_REMOTE_HOST'"
|
||||
fi
|
||||
|
||||
../bin/px4 -i $N -d "$build_path/etc" -w sitl_${MODEL}_${N} -s etc/init.d-posix/rcS >out.log 2>err.log &
|
||||
python3 ${src_path}/Tools/sitl_gazebo/scripts/jinja_gen.py ${src_path}/Tools/sitl_gazebo/models/${MODEL}/${MODEL}.sdf.jinja ${src_path}/Tools/sitl_gazebo --mavlink_tcp_port $((4560+${N})) --mavlink_udp_port $((14560+${N})) --mavlink_id $((1+${N})) --gst_udp_port $((5600+${N})) --video_uri $((5600+${N})) --mavlink_cam_udp_port $((14530+${N})) --output-file /tmp/${MODEL}_${N}.sdf
|
||||
python3 ${src_path}/Tools/sitl_gazebo/scripts/jinja_gen.py ${src_path}/Tools/sitl_gazebo/models/${MODEL}/${MODEL}.sdf.jinja ${src_path}/Tools/sitl_gazebo --mavlink_tcp_port $((4560+${N})) --mavlink_udp_port $((14560+${N})) --mavlink_id $((1+${N})) --gst_udp_host 172.5.0.$((7+${N})) --gst_udp_port $((5600)) --video_uri $((5600+${N})) --mavlink_cam_udp_port $((14530+${N})) --udp_onboard_gimbal_port_local $((13030+${N})) --output-file /tmp/${MODEL}_${N}.sdf --vehicle_id ${N}
|
||||
|
||||
echo "Spawning ${MODEL}_${N} at ${X} ${Y}"
|
||||
|
||||
@@ -47,12 +64,13 @@ function spawn_model() {
|
||||
|
||||
if [ "$1" == "-h" ] || [ "$1" == "--help" ]
|
||||
then
|
||||
echo "Usage: $0 [-n <num_vehicles>] [-m <vehicle_model>] [-w <world>] [-s <script>]"
|
||||
echo "Usage: $0 [-n <num_vehicles>] [-m <vehicle_model>] [-w <world>] [-s <script>] [-c <custom_models>]"
|
||||
echo "-s flag is used to script spawning vehicles e.g. $0 -s iris:3,plane:2"
|
||||
echo "-c flag is used to allow custom models e.g. $0 -s \"my_plane my_plane_2\""
|
||||
exit 1
|
||||
fi
|
||||
|
||||
while getopts n:m:w:s:t:l: option
|
||||
while getopts n:m:w:s:t:c: option
|
||||
do
|
||||
case "${option}"
|
||||
in
|
||||
@@ -61,7 +79,7 @@ do
|
||||
w) WORLD=${OPTARG};;
|
||||
s) SCRIPT=${OPTARG};;
|
||||
t) TARGET=${OPTARG};;
|
||||
l) LABEL=_${OPTARG};;
|
||||
c) CUSTOM_MODELS=${OPTARG};;
|
||||
esac
|
||||
done
|
||||
|
||||
@@ -71,6 +89,8 @@ target=${TARGET:=px4_sitl_default}
|
||||
vehicle_model=${VEHICLE_MODEL:="iris"}
|
||||
export PX4_SIM_MODEL=${vehicle_model}
|
||||
|
||||
SUPPORTED_MODELS+=(${CUSTOM_MODELS})
|
||||
|
||||
echo ${SCRIPT}
|
||||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
src_path="$SCRIPT_DIR/.."
|
||||
@@ -132,9 +152,19 @@ else
|
||||
n=$(($n + 1))
|
||||
done
|
||||
done
|
||||
|
||||
fi
|
||||
trap "cleanup" SIGINT SIGTERM EXIT
|
||||
|
||||
echo "Starting gazebo client"
|
||||
gzclient
|
||||
if [[ -n "$HEADLESS" ]]; then
|
||||
trap "cleanup" SIGINT SIGTERM
|
||||
|
||||
while :
|
||||
do
|
||||
|
||||
sleep 5
|
||||
done
|
||||
else
|
||||
trap "cleanup" SIGINT SIGTERM EXIT
|
||||
|
||||
echo "Starting gazebo client"
|
||||
gzclient
|
||||
fi
|
||||
|
||||
@@ -102,7 +102,7 @@ class ModuleDocumentation(object):
|
||||
def _handle_usage_param_int(self, args):
|
||||
assert(len(args) == 6) # option_char, default_val, min_val, max_val, description, is_optional
|
||||
option_char = self._get_option_char(args[0])
|
||||
default_val = int(args[1], 0)
|
||||
default_val = self._get_int(args[1])
|
||||
description = self._get_string(args[4])
|
||||
if self._is_bool_true(args[5]):
|
||||
self._usage_string += " [-%s <val>] %s\n" % (option_char, description)
|
||||
@@ -214,6 +214,9 @@ class ModuleDocumentation(object):
|
||||
f = f[:-1]
|
||||
return float(f)
|
||||
|
||||
def _get_int(self, argument):
|
||||
return int(eval(argument))
|
||||
|
||||
def _is_string(self, argument):
|
||||
return len(argument) > 0 and argument[0] == '"'
|
||||
|
||||
@@ -307,6 +310,8 @@ class SourceParser(object):
|
||||
r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"',
|
||||
re.DOTALL | re.MULTILINE)
|
||||
|
||||
self._define_pattern = re.compile(r'#define\s+(\w+?)[^\S\r\n]+(.+?)\s*?\n')
|
||||
|
||||
def Parse(self, scope, contents):
|
||||
"""
|
||||
Incrementally parse program contents and append all found documentations
|
||||
@@ -316,6 +321,9 @@ class SourceParser(object):
|
||||
# remove comments from source
|
||||
contents = self._comment_remover(contents)
|
||||
|
||||
# replace preprocessor defines defined in file directly
|
||||
contents = self._define_replacer(contents)
|
||||
|
||||
extracted_function_calls = [] # list of tuples: (FUNC_NAME, list(ARGS))
|
||||
|
||||
start_index = 0
|
||||
@@ -379,6 +387,15 @@ class SourceParser(object):
|
||||
return s
|
||||
return re.sub(self._comment_remove_pattern, replacer, text)
|
||||
|
||||
def _define_replacer(self, text):
|
||||
""" check for C preprocesor #define in text and replace with argument"""
|
||||
text = re.sub(r"\\\s*?\n"," ",text)
|
||||
define_iter = self._define_pattern.finditer(text)
|
||||
for define_pattern in define_iter:
|
||||
text = re.sub(r"\b" +re.escape(str(define_pattern.groups()[0])) + r"\b", re.escape(str(define_pattern.groups()[1])), text)
|
||||
return text
|
||||
|
||||
|
||||
def _do_consistency_check(self, contents, scope, module_doc):
|
||||
"""
|
||||
check the documentation for consistency with the code (arguments to
|
||||
|
||||
+21
-3
@@ -570,7 +570,7 @@ class uploader(object):
|
||||
self.fw_maxsize = self.__getInfo(uploader.INFO_FLASH_SIZE)
|
||||
|
||||
# upload the firmware
|
||||
def upload(self, fw, force=False, boot_delay=None):
|
||||
def upload(self, fw, force=False, boot_delay=None, boot_check=False):
|
||||
# Make sure we are doing the right thing
|
||||
start = time.time()
|
||||
if self.board_type != fw.property('board_id'):
|
||||
@@ -603,9 +603,9 @@ class uploader(object):
|
||||
print("sn: ", end='')
|
||||
for byte in range(0, 12, 4):
|
||||
x = self.__getSN(byte)
|
||||
x = x[::-1] # reverse the bytes
|
||||
self.sn = self.sn + x
|
||||
print(binascii.hexlify(x).decode('Latin-1'), end='') # show user
|
||||
|
||||
print(binascii.hexlify(self.sn).decode('Latin-1'), end='') # show user
|
||||
print('')
|
||||
print("chip: %08x" % self.__getCHIP())
|
||||
|
||||
@@ -667,6 +667,23 @@ class uploader(object):
|
||||
|
||||
print("\nRebooting.", end='')
|
||||
self.__reboot()
|
||||
|
||||
if boot_check:
|
||||
# check if application has booted
|
||||
time.sleep(self.BOOT_CHECK_DELAY_s)
|
||||
# this Sync is expected to fail, because jump to Px4 succeeded
|
||||
btl_active = False
|
||||
try:
|
||||
# Test if bootloader answers
|
||||
ret =self.__sync()
|
||||
btl_active=True
|
||||
|
||||
except Exception as e :
|
||||
btl_active = False
|
||||
|
||||
if btl_active:
|
||||
raise Exception('Did not boot to PX4 Application, still in Bootloader')
|
||||
|
||||
self.port.close()
|
||||
print(" Elapsed Time %3.3f\n" % (time.time() - start))
|
||||
|
||||
@@ -752,6 +769,7 @@ def main():
|
||||
parser.add_argument('--force', action='store_true', default=False, help='Override board type check, or silicon errata checks and continue loading')
|
||||
parser.add_argument('--boot-delay', type=int, default=None, help='minimum boot delay to store in flash')
|
||||
parser.add_argument('--use-protocol-splitter-format', action='store_true', help='use protocol splitter format for reboot')
|
||||
parser.add_argument('--boot-check', action='store_true',default=False, help='Test if bootloader has exited, after boot. Throws an excpetion if bootloader does not jump to application')
|
||||
parser.add_argument('firmware', action="store", help="Firmware file to be uploaded")
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
@@ -17,8 +17,9 @@ BUILD_DIR=$2
|
||||
|
||||
# setup Gazebo env and update package path
|
||||
export GAZEBO_PLUGIN_PATH=$GAZEBO_PLUGIN_PATH:${BUILD_DIR}/build_gazebo
|
||||
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:${SRC_DIR}/Tools/sitl_gazebo/models
|
||||
export GAZEBO_MODEL_PATH=$GAZEBO_MODEL_PATH:${SRC_DIR}/Tools/sitl_gazebo/models:${SRC_DIR}/Tools/auterion/models
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${BUILD_DIR}/build_gazebo
|
||||
export GAZEBO_MODEL_DATABASE_URI=http://simulation-models.tools.auterion.dev/
|
||||
|
||||
echo -e "GAZEBO_PLUGIN_PATH $GAZEBO_PLUGIN_PATH"
|
||||
echo -e "GAZEBO_MODEL_PATH $GAZEBO_MODEL_PATH"
|
||||
|
||||
@@ -0,0 +1,349 @@
|
||||
#!/usr/bin/env python3
|
||||
from os import EX_CANTCREAT
|
||||
import nacl.encoding
|
||||
import nacl.signing
|
||||
import nacl.hash
|
||||
import struct
|
||||
import zlib
|
||||
import json
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
import cryptotools
|
||||
import argparse
|
||||
import os
|
||||
|
||||
# Dictionary describing the possible TOC flags, they can be OR'ed together
|
||||
toc_flag_dict = {
|
||||
'TOC_FLAG1_BOOT': 0x1,
|
||||
'TOC_FLAG1_VTORS': 0x2,
|
||||
'TOC_FLAG1_CHECK_SIGNATURE': 0x4,
|
||||
'TOC_FLAG1_DECRYPT': 0x8,
|
||||
'TOC_FLAG1_COPY': 0x10,
|
||||
'TOC_FLAG1_RDCT': 0x80,
|
||||
}
|
||||
|
||||
# Dataclasses describing the TOC data, used to parse the to and from binary
|
||||
|
||||
|
||||
@dataclass
|
||||
class TOC_start:
|
||||
start_magic: str = None
|
||||
version: int = None
|
||||
STRUCT_STRUCTURE = "<4sI" # type defininig string from the STRUCT package
|
||||
|
||||
# Dataclass to describe the TOC for 2 entries, one describing the signature typen
|
||||
# and the second one describing the signature location.
|
||||
|
||||
|
||||
@dataclass
|
||||
class TOC_entry:
|
||||
toc_position: int = 0
|
||||
app_name: str = None
|
||||
app_start: int = None
|
||||
app_end: int = None
|
||||
app_target: int = None
|
||||
app_signature_idx: int = None
|
||||
app_signature_key: int = None
|
||||
app_encryption_key: int = None
|
||||
app_flags1: int = None
|
||||
app_reserved: int = None
|
||||
sig_name: str = None
|
||||
sig_start: int = None
|
||||
sig_end: int = None
|
||||
sig_target: int = None
|
||||
sig_signature_idx: int = None
|
||||
sig_signature_key: int = None
|
||||
sig_encryption_key: int = None
|
||||
sig_flags1: int = None
|
||||
sig_reserved: int = None
|
||||
# type defininig string from the STRUCT package
|
||||
STRUCT_STRUCTURE = "<4sIIIBBBBI4sIIIBBBBI"
|
||||
|
||||
|
||||
def toc2bin(data):
|
||||
'''
|
||||
Takes as TOC data class and converts it to the binary representation.
|
||||
Prepare a TOC_entry data class with values and hand it over to this function.
|
||||
|
||||
data: is a dataclass TOC_entry
|
||||
retrun: a packed binary to add to the px4 bin file
|
||||
'''
|
||||
return struct.pack(data.STRUCT_STRUCTURE,
|
||||
data.app_name, data.app_start,
|
||||
data.app_end, data.app_target,
|
||||
data.app_signature_idx,
|
||||
data.app_signature_key,
|
||||
data.app_encryption_key,
|
||||
data.app_flags1,
|
||||
data.app_reserved,
|
||||
data.sig_name, data.sig_start,
|
||||
data.sig_end, data.sig_target,
|
||||
data.sig_signature_idx,
|
||||
data.sig_signature_key,
|
||||
data.sig_encryption_key,
|
||||
data.sig_flags1,
|
||||
data.sig_reserved)
|
||||
|
||||
|
||||
def bin2toc_start(bin):
|
||||
'''
|
||||
Takes binary data and unpacks a Toc_start header.
|
||||
|
||||
bin: binary data to parse for the TOC header.
|
||||
|
||||
return: a dataclass TOC_start
|
||||
'''
|
||||
data = TOC_start()
|
||||
(data.start_magic, data.version) = struct.unpack(data.STRUCT_STRUCTURE, bin)
|
||||
return data
|
||||
|
||||
|
||||
def bin2toc_entry(bin):
|
||||
'''
|
||||
Takes binary data and unpacks it into a TOC entry dataclass.
|
||||
bin: Binary data to unpack the TOC entry from.
|
||||
|
||||
return: A dataclass of TOC_entry
|
||||
'''
|
||||
|
||||
data = TOC_entry()
|
||||
(data.app_name, data.app_start,
|
||||
data.app_end, data.app_target,
|
||||
data.app_signature_idx,
|
||||
data.app_signature_key,
|
||||
data.app_encryption_key,
|
||||
data.app_flags1,
|
||||
data.app_reserved,
|
||||
data.sig_name, data.sig_start,
|
||||
data.sig_end, data.sig_target,
|
||||
data.sig_signature_idx,
|
||||
data.sig_signature_key,
|
||||
data.sig_encryption_key,
|
||||
data.sig_flags1,
|
||||
data.sig_reserved
|
||||
) = struct.unpack(data.STRUCT_STRUCTURE, bin)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def parse_toc(bin):
|
||||
'''
|
||||
Searches for the TOC in the binary data.
|
||||
This function looks for a TOC within certain boundaries in the TOC. It
|
||||
also checks to find a valid TOC_end.
|
||||
Throws exceptions, if a toc is not found, wrong version or end is not found.
|
||||
|
||||
bin: Binary data to look for the TOC
|
||||
|
||||
return: dataclass with the TOC entry
|
||||
'''
|
||||
|
||||
# This is a fixed address from the linker file, TOC is placed after.
|
||||
BOOT_DELAY_ADDR = 0x200
|
||||
TOC_LEN_MAX = 64
|
||||
EXPECTED_TOC_VERSION = 1
|
||||
|
||||
toc_start_len = struct.calcsize(TOC_start().STRUCT_STRUCTURE)
|
||||
toc_entry_len = struct.calcsize(TOC_entry().STRUCT_STRUCTURE)
|
||||
|
||||
start_indx = bin.find(b'TOC', BOOT_DELAY_ADDR,
|
||||
BOOT_DELAY_ADDR + TOC_LEN_MAX)
|
||||
if start_indx <= 0:
|
||||
raise Exception('TOC not found')
|
||||
|
||||
toc_start_header = bin2toc_start(
|
||||
bin[start_indx:(start_indx+toc_start_len)])
|
||||
|
||||
print('TOC start found', toc_start_header,'@: ',hex(start_indx))
|
||||
|
||||
if toc_start_header.version != EXPECTED_TOC_VERSION:
|
||||
raise Exception('Wrong TOC version!')
|
||||
|
||||
t = bin2toc_entry(
|
||||
bin[start_indx+toc_start_len: (start_indx+toc_start_len+toc_entry_len)])
|
||||
t.toc_position = start_indx+toc_start_len
|
||||
print(t)
|
||||
|
||||
indx = bin.find(b'END', start_indx, start_indx + 512)
|
||||
if indx <= 0:
|
||||
toc_end = False
|
||||
raise Exception('TOC end not found')
|
||||
|
||||
return t
|
||||
|
||||
|
||||
def unpackPx4(file_path):
|
||||
'''
|
||||
Unpacks a .px4 file to get access to its binary data.
|
||||
|
||||
filepath: Path to a px4 file to extract.
|
||||
|
||||
return: A tuple of (binary data, json_data) of the file.
|
||||
|
||||
'''
|
||||
# read the file
|
||||
with open(file_path, "r") as f:
|
||||
desc = json.load(f)
|
||||
image = bytearray(zlib.decompress(base64.b64decode(desc['image'])))
|
||||
return image, desc
|
||||
|
||||
|
||||
def packPx4(bin_image, json_data, file_path, pub_key):
|
||||
'''
|
||||
Packs a new .px4 file with given binary data and information from jason file.
|
||||
|
||||
bin_image: New binary data with signature to add to .px4 file
|
||||
json_data: The json data from the previously parsed .px4.
|
||||
file_path: File path to then new .px4.sec image.
|
||||
|
||||
return: Nothing
|
||||
'''
|
||||
|
||||
head,tail=os.path.splitext(file_path)
|
||||
with open(head +'_signed.px4', 'w') as f:
|
||||
json_data['signed'] = 'Hash512_Ed25519'
|
||||
json_data['pub_key'] = pub_key.decode('utf-8')
|
||||
json_data['image_size'] = len(bin_image)
|
||||
json_data['image'] = base64.b64encode(
|
||||
zlib.compress(bin_image, 9)).decode('utf-8')
|
||||
print('Pack new signed.px4 file with signature')
|
||||
json.dump(json_data, f, indent=4)
|
||||
|
||||
|
||||
def ed25519_sign(private_key, signee_bin):
|
||||
"""
|
||||
This function creates the signature. It takes the private key and the binary file
|
||||
and returns the tuple (signature, public key)
|
||||
"""
|
||||
|
||||
signing_key = nacl.signing.SigningKey(
|
||||
private_key, encoder=nacl.encoding.HexEncoder)
|
||||
|
||||
# Sign a message with the signing key
|
||||
signed = signing_key.sign(signee_bin, encoder=nacl.encoding.RawEncoder)
|
||||
|
||||
# Obtain the verify key for a given signing key
|
||||
verify_key = signing_key.verify_key
|
||||
|
||||
# Serialize the verify key to send it to a third party
|
||||
verify_key_hex = verify_key.encode(encoder=nacl.encoding.HexEncoder)
|
||||
|
||||
return signed.signature, verify_key_hex
|
||||
|
||||
|
||||
def write_toc(toc_old, bin, signature_name, new_pub_key_index, new_flags):
|
||||
'''
|
||||
Writes a new TOC entry.
|
||||
toc_old: The parsed data_class with the data of the old TOC
|
||||
bin: The binary to insert the new TOC
|
||||
signature_name: 4 char name of the newly added signature
|
||||
new_pub_key_index: Key index of the public key belongs to the signature
|
||||
new_flags: New Toc flags from toc_flag_dict dictionary.
|
||||
|
||||
return: New bin with modified TOC
|
||||
'''
|
||||
|
||||
toc_new = toc_old
|
||||
toc_new.app_signature_key = new_pub_key_index
|
||||
toc_new.app_flags1 = new_flags
|
||||
if len(signature_name) != 4:
|
||||
raise Exception('Signature name has not the right length')
|
||||
toc_new.app_name = bytes(signature_name, 'utf_8')
|
||||
toc_new_bin = toc2bin(toc_new)
|
||||
|
||||
toc_entry_len = struct.calcsize(TOC_entry().STRUCT_STRUCTURE)
|
||||
bin[toc_old.toc_position:toc_old.toc_position+toc_entry_len] = toc_new_bin
|
||||
|
||||
return bin
|
||||
|
||||
|
||||
def cli():
|
||||
'''
|
||||
Comand lined interface to the signtool.
|
||||
See usage comand.
|
||||
|
||||
return: class with parsed arguments.
|
||||
'''
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Tool to extract and find crypto TOC from .px4 file. And append a signature from a given private key')
|
||||
# defining arguments for parser object
|
||||
parser.add_argument("--signee", type=str,
|
||||
metavar="file_path", default=None,
|
||||
help="Opens and reads the specified px4 file.")
|
||||
parser.add_argument("--private_key", type=str,
|
||||
metavar="key string", default=None,
|
||||
help="Private key to sign the px4 image")
|
||||
parser.add_argument("--key_index", type=int,
|
||||
metavar="Number", default=None,
|
||||
help="Index of the public key used to verify the binary")
|
||||
|
||||
parser.add_argument("--TOC_flags", type=str, choices=['TOC_FLAG1_BOOT', 'TOC_FLAG1_RDCT'],
|
||||
default=None,
|
||||
help="TOC flags to indicate signature")
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
||||
|
||||
def sign(file_path, private_key, key_index, TOC_flags):
|
||||
'''
|
||||
Signs a binary file and updates TOC accordingly.
|
||||
Reads a .px4 or a .bin at specified location and writes a new _signed.[px4|bin] file at the same location.
|
||||
|
||||
file_path: Path to a .px4 or .bin file to sign.
|
||||
private_key: String of the private key used to sign the binary.
|
||||
key_index: Index of the public key used to verify signature.
|
||||
TOC_flags: New toc flags to be written.
|
||||
'''
|
||||
head,tail=os.path.splitext(file_path)
|
||||
if(tail == '.px4'):
|
||||
bin, json_data = unpackPx4(file_path)
|
||||
elif(tail == '.bin'):
|
||||
with open(file_path,mode='rb') as f:
|
||||
bin =bytearray(f.read())
|
||||
else:
|
||||
raise Exception('Error: Unknown file type')
|
||||
|
||||
|
||||
toc_old = parse_toc(bin)
|
||||
new_bin = write_toc(toc_old, bin, 'MSTR', key_index,
|
||||
toc_flag_dict[TOC_flags])
|
||||
app_len = toc_old.app_end - toc_old.app_start
|
||||
print('Calculate new signature')
|
||||
signature, pub_key = cryptotools.ed25519_sign(
|
||||
private_key, bytes(new_bin[0:app_len]))
|
||||
# Append signature to binary
|
||||
print('Append signature to binary: ',signature.hex(),'\n','length: ',app_len )
|
||||
|
||||
sig_start=toc_old.sig_start - toc_old.app_start
|
||||
sig_end = toc_old.sig_end - toc_old.app_start
|
||||
|
||||
if sig_start != app_len:
|
||||
padding = bytearray(b'\xff')*(sig_start - app_len)
|
||||
new_bin += padding
|
||||
|
||||
new_bin[toc_old.sig_start-toc_old.app_start:toc_old.sig_end -
|
||||
toc_old.app_start] = signature
|
||||
|
||||
if(tail == '.px4'):
|
||||
packPx4(new_bin, json_data, file_path,pub_key)
|
||||
elif(tail == '.bin'):
|
||||
head,tail=os.path.splitext(file_path)
|
||||
with open(head +'_signed.bin',mode='wb') as f:
|
||||
f.write(bin)
|
||||
else:
|
||||
raise Exception('Error: Unknown file type!')
|
||||
|
||||
|
||||
|
||||
if(__name__ == "__main__"):
|
||||
args = cli()
|
||||
# test input to the sign function if not run from CLI
|
||||
# sign('PX4-Autopilot/build/px4_fmu-v5x_default/px4_fmu-v5x_default.px4',
|
||||
# "8e969454c3f1ba924b4d436bc58b87d20c082ea99fb6a77b9e00f7b20dab0fd8",
|
||||
# 0, 'TOC_FLAG1_BOOT')
|
||||
# CLI test input:
|
||||
# --signee /PX4-Autopilot/build/px4_fmu-v5x_default/px4_fmu-v5x_default.px4
|
||||
# --private_key 8e969454c3f1ba924b4d436bc58b87d20c082ea99fb6a77b9e00f7b20dab0fd8 --key_index 0 --TOC_flags TOC_FLAG1_BOOT
|
||||
sign(args.signee, args.private_key, args.key_index, args.TOC_flags)
|
||||
+1
-1
Submodule Tools/sitl_gazebo updated: 5610c3fb44...fd8637d32e
Reference in New Issue
Block a user