mirror of
https://gitee.com/mirrors_PX4/PX4-Autopilot.git
synced 2026-04-14 10:07:39 +08:00
[Sponsored by ARK] Encryption (#24489)
* Added the board configs for encryption, I had to disable smbus and px4 io in the arkv6x * Added the key generator script * Added the decryptor, logs are needed for it though * Added the log download and modified the decryptor * Quick fixes & README * Additional modifications & cleanup * Tested upd connection Adjusted the log downloader to handle multiple entry responses from the FC Edited README * Reverted IP address change * Added pycryptodome to the requirements.txt * fixes for log download and decryption * Removed old log decryptors and updated README * Pointed the ark borads to the dummy key updated the README accordingly * Adjusted the folders in README, removed new lines * Extended command line arguments for all possibilities for description * Added MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES after heartbeat received to make sure log request is answered in all cases * Update Tools/log_encryption/README.md Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com> * Update Tools/log_encryption/README.md Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com> * Update Tools/log_encryption/README.md Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com> * Update Tools/log_encryption/README.md Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com> * Edited README, changed the serial connection logic and updated logdownload, made decryption a bit easier to understand * Update Tools/log_encryption/README.md Co-authored-by: Hamish Willee <hamishwillee@gmail.com> * Removed new lines * arkv6x: add individual mags to default.px4board --------- Co-authored-by: Jacob Dahl <dahl.jakejacob@gmail.com> Co-authored-by: Alex Klimaj <alex@arkelectron.com> Co-authored-by: Jacob Dahl <37091262+dakejahl@users.noreply.github.com> Co-authored-by: Hamish Willee <hamishwillee@gmail.com>
This commit is contained in:
parent
9cc1e01bd8
commit
cc492bbf6e
1
.gitignore
vendored
1
.gitignore
vendored
@ -108,3 +108,4 @@ src/systemcmds/topic_listener/listener_generated.cpp
|
||||
|
||||
# colcon
|
||||
log/
|
||||
keys/
|
||||
|
||||
@ -1,172 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import nacl.encoding
|
||||
import nacl.signing
|
||||
import nacl.hash
|
||||
import struct
|
||||
import binascii
|
||||
import json
|
||||
import time
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
def make_public_key_h_file(signing_key,key_name):
|
||||
"""
|
||||
This file generate the public key header file
|
||||
to be included into the bootloader build.
|
||||
"""
|
||||
public_key_c='\n'
|
||||
for i,c in enumerate(signing_key.verify_key.encode(encoder=nacl.encoding.RawEncoder)):
|
||||
public_key_c+= hex(c)
|
||||
public_key_c+= ', '
|
||||
if((i+1)%8==0):
|
||||
public_key_c+= '\n'
|
||||
with open(key_name+'.pub' ,mode='w') as f:
|
||||
f.write("//Public key to verify signed binaries")
|
||||
f.write(public_key_c)
|
||||
|
||||
def make_key_file(signing_key, key_name):
|
||||
"""
|
||||
Writes the key.json file.
|
||||
Attention do not override your existing key files.
|
||||
Do not publish your private key!!
|
||||
"""
|
||||
|
||||
key_file = Path(key_name+'.json')
|
||||
if key_file.is_file():
|
||||
print("ATTENTION: key.json already exists, are you sure you want to overwrite it?")
|
||||
print("Remove file and run script again.")
|
||||
print("Script aborted!")
|
||||
sys.exit(1)
|
||||
|
||||
keys={}
|
||||
keys["date"] = time.asctime()
|
||||
keys["public"] = (signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)).decode()
|
||||
keys["private"] = binascii.hexlify(signing_key._seed).decode()
|
||||
#print (keys)
|
||||
with open(key_name+'.json', "w") as write_file:
|
||||
json.dump(keys, write_file)
|
||||
return keys
|
||||
|
||||
def ed25519_sign(private_key, signee_bin):
|
||||
"""
|
||||
This function creates the signature. It takes the private key and the binary file
|
||||
and returns the tuple (signature, public key)
|
||||
"""
|
||||
|
||||
signing_key = nacl.signing.SigningKey(private_key, encoder=nacl.encoding.HexEncoder)
|
||||
|
||||
# Sign a message with the signing key
|
||||
signed = signing_key.sign(signee_bin,encoder=nacl.encoding.RawEncoder)
|
||||
|
||||
# Obtain the verify key for a given signing key
|
||||
verify_key = signing_key.verify_key
|
||||
|
||||
# Serialize the verify key to send it to a third party
|
||||
verify_key_hex = verify_key.encode(encoder=nacl.encoding.HexEncoder)
|
||||
|
||||
return signed.signature, verify_key_hex
|
||||
|
||||
|
||||
def sign(bin_file_path, key_file_path=None, generated_key_file=None):
|
||||
"""
|
||||
reads the binary file and the key file.
|
||||
If the key file does not exist, it generates a
|
||||
new key file.
|
||||
"""
|
||||
|
||||
with open(bin_file_path,mode='rb') as f:
|
||||
signee_bin = f.read()
|
||||
# Align to 4 bytes. Signature always starts at
|
||||
# 4 byte aligned address, but the signee size
|
||||
# might not be aligned
|
||||
if len(signee_bin)%4 != 0:
|
||||
signee_bin += bytearray(b'\xff')*(4-len(signee_bin)%4)
|
||||
|
||||
try:
|
||||
with open(key_file_path,mode='r') as f:
|
||||
keys = json.load(f)
|
||||
#print(keys)
|
||||
except:
|
||||
print('ERROR: Key file',key_file_path,'not found')
|
||||
sys.exit(1)
|
||||
|
||||
signature, public_key = ed25519_sign(keys["private"], signee_bin)
|
||||
|
||||
# Do a sanity check. This type of signature is always 64 bytes long
|
||||
assert len(signature) == 64
|
||||
|
||||
# Print out the signing information
|
||||
print("Binary \"%s\" signed."%bin_file_path)
|
||||
print("Signature:",binascii.hexlify(signature))
|
||||
print("Public key:",binascii.hexlify(public_key))
|
||||
|
||||
return signee_bin + signature, public_key
|
||||
|
||||
def generate_key(key_file):
|
||||
"""
|
||||
Generate two files:
|
||||
"key_file.pub" containing the public key in C-format to be included in the bootloader build
|
||||
"key_file.json, containt both private and public key.
|
||||
Do not leak or loose the key file. This is mandatory for signing
|
||||
all future binaries you want to deploy!
|
||||
"""
|
||||
|
||||
# Generate a new random signing key
|
||||
signing_key = nacl.signing.SigningKey.generate()
|
||||
# Serialize the verify key to send it to a third party
|
||||
verify_key_hex = signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)
|
||||
print("public key :",verify_key_hex)
|
||||
|
||||
private_key_hex=binascii.hexlify(signing_key._seed)
|
||||
print("private key :",private_key_hex)
|
||||
|
||||
keys = make_key_file(signing_key,key_file)
|
||||
make_public_key_h_file(signing_key,key_file)
|
||||
return keys
|
||||
|
||||
if(__name__ == "__main__"):
|
||||
|
||||
parser = argparse.ArgumentParser(description="""CLI tool to calculate and add signature to px4. bin files\n
|
||||
if given it takes an existing key file, else it generate new keys""",
|
||||
epilog="Output: SignedBin.bin and a key.json file")
|
||||
parser.add_argument("signee", help=".bin file to add signature", nargs='?', default=None)
|
||||
parser.add_argument("signed", help="signed output .bin", nargs='?', default=None)
|
||||
|
||||
parser.add_argument("--key", help="key.json file", default="Tools/test_keys/test_keys.json")
|
||||
parser.add_argument("--rdct", help="binary R&D certificate file", default=None)
|
||||
parser.add_argument("--genkey", help="new generated key", default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Only generate a key pair, don't sign
|
||||
if args.genkey:
|
||||
# Only create a key file, don't sign
|
||||
generate_key(args.genkey)
|
||||
print('New key file generated:',args.genkey)
|
||||
sys.exit(0);
|
||||
|
||||
# Check that both signee and signed exist
|
||||
if not args.signee or not args.signed:
|
||||
print("ERROR: Must either provide file names for both signee and signed")
|
||||
print(" or --genkey [key] to generate a new key pair")
|
||||
sys.exit(1)
|
||||
|
||||
# Issue a warning when signing with testing key
|
||||
if args.key=='Tools/test_keys/test_keys.json':
|
||||
print("WARNING: Signing with PX4 test key")
|
||||
|
||||
# Sign the binary
|
||||
signed, public_key = sign(args.signee, args.key, args.genkey)
|
||||
|
||||
with open(args.signed, mode='wb') as fs:
|
||||
# Write signed binary
|
||||
fs.write(signed)
|
||||
|
||||
# Append rdcert if given
|
||||
try:
|
||||
with open(args.rdct ,mode='rb') as f:
|
||||
with open(args.signed, mode='ab') as fs:
|
||||
fs.write(f.read())
|
||||
except:
|
||||
pass
|
||||
@ -1,91 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
|
||||
try:
|
||||
from Crypto.Cipher import ChaCha20
|
||||
except ImportError as e:
|
||||
print("Failed to import crypto: " + str(e))
|
||||
print("")
|
||||
print("You may need to install it using:")
|
||||
print(" pip3 install --user pycryptodome")
|
||||
print("")
|
||||
sys.exit(1)
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_OAEP
|
||||
from Crypto.Hash import SHA256
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser(description="""CLI tool to decrypt an ulog file\n""")
|
||||
parser.add_argument("ulog_file", help=".ulge/.ulgc, encrypted log file", nargs='?', default=None)
|
||||
parser.add_argument("ulog_key", help=".ulgk, legacy encrypted key (give empty string '' to ignore for .ulge)", nargs='?', default=None)
|
||||
parser.add_argument("rsa_key", help=".pem format key for decrypting the ulog key", nargs='?', default=None)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Check all arguments are given
|
||||
if not args.rsa_key:
|
||||
print('Need all arguments, the encrypted ulog file, key file (or empty string if not needed) and the key decryption key (.pem)')
|
||||
sys.exit(1)
|
||||
|
||||
# Read the private RSA key to decrypt the cahcha key
|
||||
with open(args.rsa_key, 'rb') as f:
|
||||
r = RSA.importKey(f.read(), passphrase='')
|
||||
|
||||
if args.ulog_key == "":
|
||||
key_data_filename = args.ulog_file
|
||||
magic = "ULogEnc"
|
||||
else:
|
||||
key_data_filename = args.ulog_key
|
||||
magic = "ULogKey"
|
||||
|
||||
with open(key_data_filename, 'rb') as f:
|
||||
# Read the encrypted xchacha key and the nonce
|
||||
ulog_key_header = f.read(22)
|
||||
|
||||
# Parse the header
|
||||
try:
|
||||
# magic
|
||||
if not ulog_key_header.startswith(bytearray(magic.encode())):
|
||||
print("Incorrect header magic")
|
||||
raise Exception()
|
||||
# version
|
||||
if ulog_key_header[7] != 1:
|
||||
print("Unsupported header version")
|
||||
raise Exception()
|
||||
# expected key exchange algorithm (RSA_OAEP)
|
||||
if ulog_key_header[16] != 4:
|
||||
print("Unsupported key algorithm")
|
||||
raise Exception()
|
||||
key_size = ulog_key_header[19] << 8 | ulog_key_header[18]
|
||||
nonce_size = ulog_key_header[21] << 8 | ulog_key_header[20]
|
||||
ulog_key_cipher = f.read(key_size)
|
||||
nonce = f.read(nonce_size)
|
||||
except:
|
||||
print("Keydata format error")
|
||||
sys.exit(1)
|
||||
|
||||
if magic == "ULogEnc":
|
||||
data_offset = 22 + key_size + nonce_size
|
||||
else:
|
||||
data_offset = 0
|
||||
|
||||
# Decrypt the xchacha key
|
||||
cipher_rsa = PKCS1_OAEP.new(r,SHA256)
|
||||
ulog_key = cipher_rsa.decrypt(ulog_key_cipher)
|
||||
#print(binascii.hexlify(ulog_key))
|
||||
|
||||
# Read and decrypt the ulog data
|
||||
cipher = ChaCha20.new(key=ulog_key, nonce=nonce)
|
||||
|
||||
outfilename = Path(args.ulog_file).stem + ".ulog"
|
||||
with open(args.ulog_file, 'rb') as f:
|
||||
if data_offset > 0:
|
||||
f.seek(data_offset)
|
||||
with open(outfilename, 'wb') as out:
|
||||
out.write(cipher.decrypt(f.read()))
|
||||
86
Tools/log_encryption/README.md
Normal file
86
Tools/log_encryption/README.md
Normal file
@ -0,0 +1,86 @@
|
||||
# PX4 Log Encryption Tools
|
||||
|
||||
Tools for generating encryption keys, building PX4 firmware with encrypted logs, downloading logs, and decrypting them.
|
||||
|
||||
For more information see: https://docs.px4.io/main/en/dev_log/log_encryption.html
|
||||
|
||||
## Usage
|
||||
|
||||
1. **Get the board file**:
|
||||
In order to use these tools you need to create an `encrypted_logs` target in your target board directory. For example:
|
||||
```bash
|
||||
encrypted_logs.px4board
|
||||
```
|
||||
Using `make menuconfig` you should enable these settings: `Blake2s hash algorithm`, `entropy pool` and `strong random number generator` and select `use interrupts` to feed timing randomness to the entropy pool.
|
||||
Once you have generated the keys make sure you add them to the boardconfig.
|
||||
|
||||
```bash
|
||||
make <your_board_name>_encrypted_logs menuconfig
|
||||
```
|
||||
|
||||
2. **Generate Keys**:
|
||||
```bash
|
||||
cd PX4-Autopilot/Tools/log_encryption
|
||||
python3 generate_keys.py
|
||||
```
|
||||
|
||||
Make sure you have the right key in your board file
|
||||
```CONFIG_PUBLIC_KEY1="../../../keys/public/public_key.pub"```
|
||||
|
||||
3. **Build Firmware**:
|
||||
```bash
|
||||
cd PX4-Autopilot
|
||||
|
||||
AND
|
||||
|
||||
make <your_board_name>_encrypted_logs
|
||||
|
||||
FOR INSTANCE
|
||||
make_ark_fpv_encrypted_logs
|
||||
|
||||
Upload the custom firmware on your flight controller and record some logs
|
||||
```
|
||||
|
||||
4. **Download Logs**:
|
||||
```bash
|
||||
cd PX4-Autopilot/Tools/log_encryption
|
||||
|
||||
python3 download_logs.py /dev/ttyACM0 --baudrate 57600
|
||||
|
||||
OR
|
||||
|
||||
python3 download_logs.py udp:0.0.0.0:14550
|
||||
```
|
||||
|
||||
Addresses might need to be adjusted
|
||||
|
||||
5. **Decrypt Logs**:
|
||||
The easiest way to run this is to have your private key and encrypted logs in the following folders respectively:
|
||||
```bash
|
||||
PX4-Autopilot/keys/private
|
||||
PX4-Autopilot/logs/encrypted
|
||||
```
|
||||
Then run:
|
||||
```bash
|
||||
cd PX4-Autopilot/Tools/log_encryption
|
||||
|
||||
AND
|
||||
# Uses default key + default folder
|
||||
python3 decrypt_logs.py
|
||||
|
||||
OR
|
||||
# Use --help to get all the options
|
||||
python3 decrypt_logs.py --help
|
||||
```
|
||||
|
||||
Your decrypted logs can be found in:
|
||||
```bash
|
||||
PX4-Autopilot/logs/decrypted
|
||||
```
|
||||
Otherwise
|
||||
|
||||
## Directory Structure
|
||||
|
||||
- **`keys/`**: Encryption keys.
|
||||
- **`logs/encrypted/`**: Downloaded encrypted logs.
|
||||
- **`logs/decrypted/`**: Decrypted logs.
|
||||
135
Tools/log_encryption/decrypt_logs.py
Normal file
135
Tools/log_encryption/decrypt_logs.py
Normal file
@ -0,0 +1,135 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
from Crypto.Cipher import ChaCha20
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_OAEP
|
||||
from Crypto.Hash import SHA256
|
||||
except ImportError as e:
|
||||
print("Failed to import crypto: " + str(e))
|
||||
print("You may need to install it using:")
|
||||
print(" pip3 install --user pycryptodome")
|
||||
sys.exit(1)
|
||||
|
||||
PX4_MAIN_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
|
||||
ENCRYPTED_LOGS_DIR = os.path.join(PX4_MAIN_DIR, "logs/encrypted")
|
||||
DECRYPTED_LOGS_DIR = os.path.join(PX4_MAIN_DIR, "logs/decrypted")
|
||||
DEFAULT_PRIVATE_KEY = os.path.join(PX4_MAIN_DIR, "keys/private/private_key.pem")
|
||||
|
||||
def decrypt_log_file(ulog_file, private_key, output_folder):
|
||||
"""Decrypts a single encrypted log file (.ulge) and saves it as .ulg in the output folder."""
|
||||
|
||||
try:
|
||||
# Read the private RSA key
|
||||
with open(private_key, 'rb') as f:
|
||||
key = RSA.import_key(f.read())
|
||||
|
||||
magic = "ULogEnc"
|
||||
header_size = 22
|
||||
|
||||
with open(ulog_file, 'rb') as f:
|
||||
# Encrypted .ulge file contains following sections:
|
||||
# -------------------------
|
||||
# | Header |
|
||||
# -------------------------
|
||||
# | Wrapped symmetric key |
|
||||
# -------------------------
|
||||
# | Encrypted ulog data |
|
||||
# -------------------------
|
||||
header = f.read(header_size)
|
||||
|
||||
# Parse the header
|
||||
if not header.startswith(bytearray(magic.encode())):
|
||||
print(f"Skipping {ulog_file}: Incorrect header magic")
|
||||
return
|
||||
if header[7] != 1:
|
||||
print(f"Skipping {ulog_file}: Unsupported header version")
|
||||
return
|
||||
if header[16] != 4:
|
||||
print(f"Skipping {ulog_file}: Unsupported key algorithm")
|
||||
return
|
||||
|
||||
key_size = header[19] << 8 | header[18]
|
||||
nonce_size = header[21] << 8 | header[20]
|
||||
cipher = f.read(key_size)
|
||||
nonce = f.read(nonce_size)
|
||||
|
||||
data_offset = header_size + key_size + nonce_size
|
||||
|
||||
# Try to decrypt the ChaCha key
|
||||
cipher_rsa = PKCS1_OAEP.new(key, SHA256)
|
||||
try:
|
||||
ulog_key = cipher_rsa.decrypt(cipher)
|
||||
except ValueError:
|
||||
print(f"Skipping {ulog_file}: Incorrect decryption (wrong key)")
|
||||
return
|
||||
|
||||
# Read and decrypt the log data
|
||||
cipher = ChaCha20.new(key=ulog_key, nonce=nonce)
|
||||
|
||||
# Save decrypted log with .ulg extension
|
||||
output_path = os.path.join(output_folder, Path(ulog_file).stem + ".ulg")
|
||||
with open(ulog_file, 'rb') as f:
|
||||
if data_offset > 0:
|
||||
f.seek(data_offset)
|
||||
with open(output_path, 'wb') as out:
|
||||
out.write(cipher.decrypt(f.read()))
|
||||
|
||||
print(f"{output_path}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Skipping {ulog_file}: Error occurred - {e}")
|
||||
|
||||
|
||||
def decrypt_all_logs(private_key_path, log_source_path=None):
|
||||
"""Decrypts all logs in the given folder or a single file."""
|
||||
|
||||
if log_source_path and os.path.isfile(log_source_path):
|
||||
logs = [log_source_path]
|
||||
else:
|
||||
# Use default encrypted logs directory if not provided
|
||||
folder = log_source_path if log_source_path else ENCRYPTED_LOGS_DIR
|
||||
logs = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".ulge")]
|
||||
|
||||
if not logs:
|
||||
print("No encrypted logs found.")
|
||||
return
|
||||
|
||||
print(f"Found {len(logs)} encrypted log(s). Decrypting...")
|
||||
|
||||
os.makedirs(DECRYPTED_LOGS_DIR, exist_ok=True)
|
||||
|
||||
for log_path in logs:
|
||||
decrypt_log_file(log_path, private_key_path, DECRYPTED_LOGS_DIR)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Decrypt PX4 encrypted log files (.ulge) using a ChaCha20+RSA scheme.\n\n"
|
||||
"Usage examples:\n"
|
||||
" python3 decrypt_logs.py /path/to/private_key.pem /path/to/custom_log.ulge\n"
|
||||
" python3 decrypt_logs.py /path/to/private_key.pem /path/to/folder_with_ulge_files\n"
|
||||
" python3 decrypt_logs.py # Uses default key + default log folder\n",
|
||||
formatter_class=argparse.RawTextHelpFormatter
|
||||
)
|
||||
|
||||
parser.add_argument("private_key", nargs="?", default=None,
|
||||
help="Path to the private RSA key (.pem). If omitted, uses default key.")
|
||||
parser.add_argument("log_file_or_folder", nargs="?", default=None,
|
||||
help="Path to a single .ulge file or folder containing them. If omitted, uses default encrypted log folder.")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
private_key_path = args.private_key if args.private_key else DEFAULT_PRIVATE_KEY
|
||||
|
||||
if not os.path.exists(private_key_path):
|
||||
print(f"Error: Private key file not found at {private_key_path}")
|
||||
sys.exit(1)
|
||||
|
||||
decrypt_all_logs(private_key_path, args.log_file_or_folder)
|
||||
190
Tools/log_encryption/download_logs.py
Normal file
190
Tools/log_encryption/download_logs.py
Normal file
@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import shutil
|
||||
import threading
|
||||
from pymavlink import mavutil
|
||||
from argparse import ArgumentParser
|
||||
|
||||
class MavlinkLogDownloader:
|
||||
def __init__(self, connection_url, output_dir, baudrate=57600, source_system=254):
|
||||
self.connection_url = connection_url
|
||||
self.output_dir = output_dir
|
||||
self.encrypted_dir = os.path.join(output_dir, "encrypted")
|
||||
self.running = True
|
||||
|
||||
# Ensure directories exist
|
||||
os.makedirs(self.output_dir, exist_ok=True)
|
||||
os.makedirs(self.encrypted_dir, exist_ok=True)
|
||||
|
||||
# # Handle serial or UDP connections
|
||||
if os.path.exists(connection_url): # likely a serial device
|
||||
self.mav = mavutil.mavlink_connection(connection_url, baud=baudrate, source_system=source_system)
|
||||
else:
|
||||
self.mav = mavutil.mavlink_connection(connection_url, source_system=source_system)
|
||||
|
||||
|
||||
self.mav.WIRE_PROTOCOL_VERSION = "2.0"
|
||||
|
||||
# Start heartbeat thread
|
||||
self.heartbeat_thread = threading.Thread(target=self.send_heartbeat_thread)
|
||||
self.heartbeat_thread.daemon = True
|
||||
self.heartbeat_thread.start()
|
||||
|
||||
self.mav.wait_heartbeat()
|
||||
print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}")
|
||||
|
||||
# Waking up the autopilot, it is needed to ensure we get answer for log request
|
||||
self.mav.mav.command_long_send(
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE, # Command ID 512
|
||||
0, # Confirmation
|
||||
mavutil.mavlink.MAVLINK_MSG_ID_AUTOPILOT_VERSION, # param1: Message ID 148
|
||||
0, 0, 0, 0, 0, 0 # params 2–7 are not used for this message
|
||||
)
|
||||
|
||||
# Allow heartbeats to establish connection
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
def send_heartbeat_thread(self):
|
||||
while self.running:
|
||||
self.mav.mav.heartbeat_send(
|
||||
mavutil.mavlink.MAV_TYPE_GCS,
|
||||
mavutil.mavlink.MAV_AUTOPILOT_GENERIC,
|
||||
0, 0, 0
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def download_logs(self):
|
||||
"""Downloads logs to the output_dir."""
|
||||
print("Request logs...")
|
||||
self.mav.mav.log_request_list_send(self.mav.target_system, self.mav.target_component, 0, 0xFFFF)
|
||||
|
||||
log_entries = {}
|
||||
total_logs = None
|
||||
start_time = time.time()
|
||||
last_entry_time = None
|
||||
|
||||
while True:
|
||||
current_time = time.time()
|
||||
|
||||
# Case 1: If we haven't received any entries yet and we've waited more than 5 seconds
|
||||
if not log_entries and current_time > start_time + 5:
|
||||
print("Timed out waiting for first log entry (5s)")
|
||||
break
|
||||
|
||||
# Case 2: If we have received at least one entry and it's been more than 3 seconds since the last one
|
||||
if last_entry_time and current_time - last_entry_time > 3:
|
||||
print(f"No new log entries received for 3 seconds. Moving on.")
|
||||
break
|
||||
|
||||
# Case 3: If we've received all expected logs
|
||||
if total_logs is not None and len(log_entries) >= total_logs:
|
||||
print(f"Received all {total_logs} log entries.")
|
||||
break
|
||||
|
||||
msg = self.mav.recv_match(type='LOG_ENTRY', blocking=True, timeout=1)
|
||||
|
||||
if msg and msg.id not in log_entries:
|
||||
last_entry_time = time.time()
|
||||
log_entries[msg.id] = msg
|
||||
|
||||
if total_logs is None:
|
||||
total_logs = msg.num_logs
|
||||
|
||||
print(f"Log ID: {msg.id}, Size: {msg.size} bytes, Date: {msg.time_utc} ({len(log_entries)}/{total_logs})")
|
||||
|
||||
if not log_entries:
|
||||
print("No log entries found.")
|
||||
return
|
||||
|
||||
for entry in log_entries.values():
|
||||
self.download_log_file(entry)
|
||||
|
||||
self.classify_logs()
|
||||
|
||||
|
||||
def download_log_file(self, log_entry):
|
||||
"""Downloads a log file to the output_dir."""
|
||||
log_id = log_entry.id
|
||||
log_size = log_entry.size
|
||||
log_date = time.strftime("%Y-%m-%d_%H-%M-%S", time.gmtime(log_entry.time_utc))
|
||||
output_filename = os.path.join(self.output_dir, f"log-{log_date}_{log_id}.ulg")
|
||||
|
||||
print(f"Downloading log {log_id} ({log_size} bytes) to {output_filename}...")
|
||||
|
||||
with open(output_filename, 'wb') as f:
|
||||
self.mav.mav.log_request_data_send(self.mav.target_system, self.mav.target_component, log_id, 0, 0xFFFFFFFF)
|
||||
|
||||
bytes_received = 0
|
||||
while bytes_received < log_size:
|
||||
msg = self.mav.recv_match(type='LOG_DATA', blocking=True, timeout=5)
|
||||
if msg:
|
||||
data_bytes = bytes(msg.data[:msg.count])
|
||||
f.write(data_bytes)
|
||||
bytes_received += msg.count
|
||||
else:
|
||||
print("Timeout waiting for log data.")
|
||||
break
|
||||
|
||||
print(f"Finished downloading log {log_id}.")
|
||||
|
||||
def classify_logs(self):
|
||||
"""Classifies logs as encrypted (.ulge) based on file content."""
|
||||
for log_file in os.listdir(self.output_dir):
|
||||
log_path = os.path.join(self.output_dir, log_file)
|
||||
|
||||
if not os.path.isfile(log_path):
|
||||
continue
|
||||
|
||||
# Read first 10 bytes to check for "ULogEnc"
|
||||
with open(log_path, 'rb') as f:
|
||||
first_bytes = f.read(10)
|
||||
|
||||
if b'ULogEnc' in first_bytes:
|
||||
new_filename = log_file.replace(".ulg", ".ulge")
|
||||
new_path = os.path.join(self.encrypted_dir, new_filename)
|
||||
print(f"Found encrypted log: {new_path}")
|
||||
shutil.move(log_path, new_path)
|
||||
|
||||
|
||||
def cleanup(self):
|
||||
"""Stop the heartbeat thread and clean up resources."""
|
||||
self.running = False
|
||||
if self.heartbeat_thread.is_alive():
|
||||
self.heartbeat_thread.join(timeout=2.0)
|
||||
|
||||
|
||||
def main():
|
||||
parser = ArgumentParser(description="Download PX4 log files over MAVLink.")
|
||||
parser.add_argument('connection_url', help="MAVLink connection URL (e.g., udp:0.0.0.0:14550, /dev/ttyACM0 --baudrate 57600)")
|
||||
parser.add_argument('--output', '-o', default=os.path.join(os.path.dirname(__file__), "../..", "logs"), help="Output directory for log files (default: ../../logs)")
|
||||
parser.add_argument('--baudrate', type=int, default=57600, help="Baudrate for serial connection (default: 57600)")
|
||||
parser.add_argument('--source-system', type=int, default=254, help="MAVLink source system ID (default: 254)")
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
output_dir = os.path.abspath(args.output)
|
||||
|
||||
print(f"Connecting to {args.connection_url}...")
|
||||
log_downloader = MavlinkLogDownloader(
|
||||
args.connection_url,
|
||||
output_dir,
|
||||
baudrate=args.baudrate,
|
||||
source_system=args.source_system
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
log_downloader.download_logs()
|
||||
finally:
|
||||
log_downloader.cleanup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
59
Tools/log_encryption/generate_keys.py
Normal file
59
Tools/log_encryption/generate_keys.py
Normal file
@ -0,0 +1,59 @@
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
# Define the main PX4 directory (one level up from Tools)
|
||||
PX4_MAIN_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
|
||||
|
||||
# Define the key folder paths
|
||||
KEY_FOLDER = os.path.join(PX4_MAIN_DIR, "keys")
|
||||
PUBLIC_KEY_FOLDER = os.path.join(KEY_FOLDER, "public")
|
||||
PRIVATE_KEY_FOLDER = os.path.join(KEY_FOLDER, "private")
|
||||
|
||||
# Define the key file paths
|
||||
PRIVATE_KEY_PATH = os.path.join(PRIVATE_KEY_FOLDER, "private_key.pem")
|
||||
PUBLIC_KEY_DER_PATH = os.path.join(PUBLIC_KEY_FOLDER, "public_key.der")
|
||||
PUBLIC_KEY_PUB_PATH = os.path.join(PUBLIC_KEY_FOLDER, "public_key.pub")
|
||||
|
||||
def create_key_folders():
|
||||
"""Creates key, public, and private folders if they do not exist."""
|
||||
for folder in [KEY_FOLDER, PUBLIC_KEY_FOLDER, PRIVATE_KEY_FOLDER]:
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
print(f"Created '{folder}' directory.")
|
||||
else:
|
||||
print(f"'{folder}' directory already exists.")
|
||||
|
||||
def generate_private_key():
|
||||
"""Generates a private key if it does not exist."""
|
||||
if not os.path.exists(PRIVATE_KEY_PATH):
|
||||
print("Generating private key...")
|
||||
subprocess.run(["openssl", "genpkey", "-algorithm", "RSA", "-out", PRIVATE_KEY_PATH, "-pkeyopt", "rsa_keygen_bits:2048"])
|
||||
print(f"Private key generated at: {PRIVATE_KEY_PATH}")
|
||||
else:
|
||||
print("Private key already exists.")
|
||||
|
||||
def generate_public_key():
|
||||
"""Generates a public key in DER and PUB formats if they do not exist."""
|
||||
if not os.path.exists(PUBLIC_KEY_DER_PATH):
|
||||
print("Generating public key in DER format...")
|
||||
subprocess.run(["openssl", "rsa", "-pubout", "-in", PRIVATE_KEY_PATH, "-outform", "DER", "-out", PUBLIC_KEY_DER_PATH])
|
||||
print(f"Public key (DER) generated at: {PUBLIC_KEY_DER_PATH}")
|
||||
else:
|
||||
print("Public key (DER) already exists.")
|
||||
|
||||
if not os.path.exists(PUBLIC_KEY_PUB_PATH):
|
||||
print("Generating public key in hex format...")
|
||||
with open(PUBLIC_KEY_PUB_PATH, "w") as pub_file:
|
||||
process = subprocess.Popen(["xxd", "-p", PUBLIC_KEY_DER_PATH], stdout=subprocess.PIPE)
|
||||
output, _ = process.communicate()
|
||||
hex_string = output.decode().strip().replace("\n", "")
|
||||
formatted_hex = ", ".join(f"0x{hex_string[i:i+2]}" for i in range(0, len(hex_string), 2))
|
||||
pub_file.write(formatted_hex)
|
||||
print(f"Public key (hex) generated at: {PUBLIC_KEY_PUB_PATH}")
|
||||
else:
|
||||
print("Public key (hex) already exists.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
create_key_folders()
|
||||
generate_private_key()
|
||||
generate_public_key()
|
||||
@ -26,3 +26,4 @@ setuptools>=39.2.0
|
||||
six>=1.12.0
|
||||
toml>=0.9
|
||||
sympy>=1.10.1
|
||||
pycryptodome
|
||||
|
||||
@ -25,7 +25,16 @@ CONFIG_DRIVERS_IMU_INVENSENSE_IIM42652=y
|
||||
CONFIG_DRIVERS_IMU_INVENSENSE_IIM42653=y
|
||||
CONFIG_DRIVERS_IMU_MURATA_SCH16T=y
|
||||
CONFIG_COMMON_LIGHT=y
|
||||
CONFIG_COMMON_MAGNETOMETER=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_BOSCH_BMM150=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_HMC5883=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_QMC5883L=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_ISENTEK_IST8308=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_ISENTEK_IST8310=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_LIS3MDL=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_LSM303AGR=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_RM3100=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_MEMSIC_MMC5983MA=y
|
||||
CONFIG_DRIVERS_MAGNETOMETER_ST_IIS2MDC=y
|
||||
CONFIG_DRIVERS_POWER_MONITOR_INA226=y
|
||||
CONFIG_DRIVERS_POWER_MONITOR_INA228=y
|
||||
CONFIG_DRIVERS_POWER_MONITOR_INA238=y
|
||||
|
||||
8
boards/ark/fmu-v6x/encrypted_logs.px4board
Normal file
8
boards/ark/fmu-v6x/encrypted_logs.px4board
Normal file
@ -0,0 +1,8 @@
|
||||
CONFIG_DRIVERS_BATT_SMBUS=n
|
||||
CONFIG_DRIVERS_PX4IO=n
|
||||
CONFIG_BOARD_CRYPTO=y
|
||||
CONFIG_DRIVERS_STUB_KEYSTORE=y
|
||||
CONFIG_DRIVERS_SW_CRYPTO=y
|
||||
# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set
|
||||
CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub"
|
||||
CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub"
|
||||
@ -84,6 +84,8 @@ CONFIG_CDCACM_RXBUFSIZE=600
|
||||
CONFIG_CDCACM_TXBUFSIZE=12000
|
||||
CONFIG_CDCACM_VENDORID=0x3185
|
||||
CONFIG_CDCACM_VENDORSTR="ARK"
|
||||
CONFIG_CRYPTO=y
|
||||
CONFIG_CRYPTO_RANDOM_POOL=y
|
||||
CONFIG_DEBUG_FULLOPT=y
|
||||
CONFIG_DEBUG_HARDFAULT_ALERT=y
|
||||
CONFIG_DEBUG_MEMFAULT=y
|
||||
|
||||
7
boards/ark/fpv/encrypted_logs.px4board
Normal file
7
boards/ark/fpv/encrypted_logs.px4board
Normal file
@ -0,0 +1,7 @@
|
||||
|
||||
CONFIG_BOARD_CRYPTO=y
|
||||
CONFIG_DRIVERS_STUB_KEYSTORE=y
|
||||
CONFIG_DRIVERS_SW_CRYPTO=y
|
||||
# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set
|
||||
CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub"
|
||||
CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub"
|
||||
@ -82,6 +82,8 @@ CONFIG_CDCACM_RXBUFSIZE=600
|
||||
CONFIG_CDCACM_TXBUFSIZE=12000
|
||||
CONFIG_CDCACM_VENDORID=0x3185
|
||||
CONFIG_CDCACM_VENDORSTR="ARK"
|
||||
CONFIG_CRYPTO=y
|
||||
CONFIG_CRYPTO_RANDOM_POOL=y
|
||||
CONFIG_DEBUG_FULLOPT=y
|
||||
CONFIG_DEBUG_HARDFAULT_ALERT=y
|
||||
CONFIG_DEBUG_MEMFAULT=y
|
||||
|
||||
6
boards/ark/pi6x/encrypted_logs.px4board
Normal file
6
boards/ark/pi6x/encrypted_logs.px4board
Normal file
@ -0,0 +1,6 @@
|
||||
CONFIG_BOARD_CRYPTO=y
|
||||
CONFIG_DRIVERS_STUB_KEYSTORE=y
|
||||
CONFIG_DRIVERS_SW_CRYPTO=y
|
||||
# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set
|
||||
CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub"
|
||||
CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub"
|
||||
@ -82,6 +82,8 @@ CONFIG_CDCACM_RXBUFSIZE=600
|
||||
CONFIG_CDCACM_TXBUFSIZE=12000
|
||||
CONFIG_CDCACM_VENDORID=0x3185
|
||||
CONFIG_CDCACM_VENDORSTR="ARK"
|
||||
CONFIG_CRYPTO=y
|
||||
CONFIG_CRYPTO_RANDOM_POOL=y
|
||||
CONFIG_DEBUG_FULLOPT=y
|
||||
CONFIG_DEBUG_HARDFAULT_ALERT=y
|
||||
CONFIG_DEBUG_MEMFAULT=y
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user