diff --git a/.gitignore b/.gitignore index 165a8e361c..f90db3cf6f 100644 --- a/.gitignore +++ b/.gitignore @@ -108,3 +108,4 @@ src/systemcmds/topic_listener/listener_generated.cpp # colcon log/ +keys/ diff --git a/Tools/cryptotools.py b/Tools/cryptotools.py deleted file mode 100755 index f452bea444..0000000000 --- a/Tools/cryptotools.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python3 - -import nacl.encoding -import nacl.signing -import nacl.hash -import struct -import binascii -import json -import time -import argparse -from pathlib import Path -import sys - -def make_public_key_h_file(signing_key,key_name): - """ - This file generate the public key header file - to be included into the bootloader build. - """ - public_key_c='\n' - for i,c in enumerate(signing_key.verify_key.encode(encoder=nacl.encoding.RawEncoder)): - public_key_c+= hex(c) - public_key_c+= ', ' - if((i+1)%8==0): - public_key_c+= '\n' - with open(key_name+'.pub' ,mode='w') as f: - f.write("//Public key to verify signed binaries") - f.write(public_key_c) - -def make_key_file(signing_key, key_name): - """ - Writes the key.json file. - Attention do not override your existing key files. - Do not publish your private key!! - """ - - key_file = Path(key_name+'.json') - if key_file.is_file(): - print("ATTENTION: key.json already exists, are you sure you want to overwrite it?") - print("Remove file and run script again.") - print("Script aborted!") - sys.exit(1) - - keys={} - keys["date"] = time.asctime() - keys["public"] = (signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder)).decode() - keys["private"] = binascii.hexlify(signing_key._seed).decode() - #print (keys) - with open(key_name+'.json', "w") as write_file: - json.dump(keys, write_file) - return keys - -def ed25519_sign(private_key, signee_bin): - """ - This function creates the signature. It takes the private key and the binary file - and returns the tuple (signature, public key) - """ - - signing_key = nacl.signing.SigningKey(private_key, encoder=nacl.encoding.HexEncoder) - - # Sign a message with the signing key - signed = signing_key.sign(signee_bin,encoder=nacl.encoding.RawEncoder) - - # Obtain the verify key for a given signing key - verify_key = signing_key.verify_key - - # Serialize the verify key to send it to a third party - verify_key_hex = verify_key.encode(encoder=nacl.encoding.HexEncoder) - - return signed.signature, verify_key_hex - - -def sign(bin_file_path, key_file_path=None, generated_key_file=None): - """ - reads the binary file and the key file. - If the key file does not exist, it generates a - new key file. - """ - - with open(bin_file_path,mode='rb') as f: - signee_bin = f.read() - # Align to 4 bytes. Signature always starts at - # 4 byte aligned address, but the signee size - # might not be aligned - if len(signee_bin)%4 != 0: - signee_bin += bytearray(b'\xff')*(4-len(signee_bin)%4) - - try: - with open(key_file_path,mode='r') as f: - keys = json.load(f) - #print(keys) - except: - print('ERROR: Key file',key_file_path,'not found') - sys.exit(1) - - signature, public_key = ed25519_sign(keys["private"], signee_bin) - - # Do a sanity check. This type of signature is always 64 bytes long - assert len(signature) == 64 - - # Print out the signing information - print("Binary \"%s\" signed."%bin_file_path) - print("Signature:",binascii.hexlify(signature)) - print("Public key:",binascii.hexlify(public_key)) - - return signee_bin + signature, public_key - -def generate_key(key_file): - """ - Generate two files: - "key_file.pub" containing the public key in C-format to be included in the bootloader build - "key_file.json, containt both private and public key. - Do not leak or loose the key file. This is mandatory for signing - all future binaries you want to deploy! - """ - - # Generate a new random signing key - signing_key = nacl.signing.SigningKey.generate() - # Serialize the verify key to send it to a third party - verify_key_hex = signing_key.verify_key.encode(encoder=nacl.encoding.HexEncoder) - print("public key :",verify_key_hex) - - private_key_hex=binascii.hexlify(signing_key._seed) - print("private key :",private_key_hex) - - keys = make_key_file(signing_key,key_file) - make_public_key_h_file(signing_key,key_file) - return keys - -if(__name__ == "__main__"): - - parser = argparse.ArgumentParser(description="""CLI tool to calculate and add signature to px4. bin files\n - if given it takes an existing key file, else it generate new keys""", - epilog="Output: SignedBin.bin and a key.json file") - parser.add_argument("signee", help=".bin file to add signature", nargs='?', default=None) - parser.add_argument("signed", help="signed output .bin", nargs='?', default=None) - - parser.add_argument("--key", help="key.json file", default="Tools/test_keys/test_keys.json") - parser.add_argument("--rdct", help="binary R&D certificate file", default=None) - parser.add_argument("--genkey", help="new generated key", default=None) - args = parser.parse_args() - - # Only generate a key pair, don't sign - if args.genkey: - # Only create a key file, don't sign - generate_key(args.genkey) - print('New key file generated:',args.genkey) - sys.exit(0); - - # Check that both signee and signed exist - if not args.signee or not args.signed: - print("ERROR: Must either provide file names for both signee and signed") - print(" or --genkey [key] to generate a new key pair") - sys.exit(1) - - # Issue a warning when signing with testing key - if args.key=='Tools/test_keys/test_keys.json': - print("WARNING: Signing with PX4 test key") - - # Sign the binary - signed, public_key = sign(args.signee, args.key, args.genkey) - - with open(args.signed, mode='wb') as fs: - # Write signed binary - fs.write(signed) - - # Append rdcert if given - try: - with open(args.rdct ,mode='rb') as f: - with open(args.signed, mode='ab') as fs: - fs.write(f.read()) - except: - pass diff --git a/Tools/decrypt_ulog.py b/Tools/decrypt_ulog.py deleted file mode 100755 index 3015686b97..0000000000 --- a/Tools/decrypt_ulog.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python3 - -import sys - -try: - from Crypto.Cipher import ChaCha20 -except ImportError as e: - print("Failed to import crypto: " + str(e)) - print("") - print("You may need to install it using:") - print(" pip3 install --user pycryptodome") - print("") - sys.exit(1) - -from Crypto.PublicKey import RSA -from Crypto.Cipher import PKCS1_OAEP -from Crypto.Hash import SHA256 -from pathlib import Path -import argparse - - -if __name__ == "__main__": - - parser = argparse.ArgumentParser(description="""CLI tool to decrypt an ulog file\n""") - parser.add_argument("ulog_file", help=".ulge/.ulgc, encrypted log file", nargs='?', default=None) - parser.add_argument("ulog_key", help=".ulgk, legacy encrypted key (give empty string '' to ignore for .ulge)", nargs='?', default=None) - parser.add_argument("rsa_key", help=".pem format key for decrypting the ulog key", nargs='?', default=None) - - args = parser.parse_args() - - # Check all arguments are given - if not args.rsa_key: - print('Need all arguments, the encrypted ulog file, key file (or empty string if not needed) and the key decryption key (.pem)') - sys.exit(1) - - # Read the private RSA key to decrypt the cahcha key - with open(args.rsa_key, 'rb') as f: - r = RSA.importKey(f.read(), passphrase='') - - if args.ulog_key == "": - key_data_filename = args.ulog_file - magic = "ULogEnc" - else: - key_data_filename = args.ulog_key - magic = "ULogKey" - - with open(key_data_filename, 'rb') as f: - # Read the encrypted xchacha key and the nonce - ulog_key_header = f.read(22) - - # Parse the header - try: - # magic - if not ulog_key_header.startswith(bytearray(magic.encode())): - print("Incorrect header magic") - raise Exception() - # version - if ulog_key_header[7] != 1: - print("Unsupported header version") - raise Exception() - # expected key exchange algorithm (RSA_OAEP) - if ulog_key_header[16] != 4: - print("Unsupported key algorithm") - raise Exception() - key_size = ulog_key_header[19] << 8 | ulog_key_header[18] - nonce_size = ulog_key_header[21] << 8 | ulog_key_header[20] - ulog_key_cipher = f.read(key_size) - nonce = f.read(nonce_size) - except: - print("Keydata format error") - sys.exit(1) - - if magic == "ULogEnc": - data_offset = 22 + key_size + nonce_size - else: - data_offset = 0 - - # Decrypt the xchacha key - cipher_rsa = PKCS1_OAEP.new(r,SHA256) - ulog_key = cipher_rsa.decrypt(ulog_key_cipher) - #print(binascii.hexlify(ulog_key)) - - # Read and decrypt the ulog data - cipher = ChaCha20.new(key=ulog_key, nonce=nonce) - - outfilename = Path(args.ulog_file).stem + ".ulog" - with open(args.ulog_file, 'rb') as f: - if data_offset > 0: - f.seek(data_offset) - with open(outfilename, 'wb') as out: - out.write(cipher.decrypt(f.read())) diff --git a/Tools/log_encryption/README.md b/Tools/log_encryption/README.md new file mode 100644 index 0000000000..a4408c0424 --- /dev/null +++ b/Tools/log_encryption/README.md @@ -0,0 +1,86 @@ +# PX4 Log Encryption Tools + +Tools for generating encryption keys, building PX4 firmware with encrypted logs, downloading logs, and decrypting them. + +For more information see: https://docs.px4.io/main/en/dev_log/log_encryption.html + +## Usage + +1. **Get the board file**: + In order to use these tools you need to create an `encrypted_logs` target in your target board directory. For example: + ```bash + encrypted_logs.px4board + ``` + Using `make menuconfig` you should enable these settings: `Blake2s hash algorithm`, `entropy pool` and `strong random number generator` and select `use interrupts` to feed timing randomness to the entropy pool. + Once you have generated the keys make sure you add them to the boardconfig. + + ```bash + make _encrypted_logs menuconfig + ``` + +2. **Generate Keys**: + ```bash + cd PX4-Autopilot/Tools/log_encryption + python3 generate_keys.py + ``` + + Make sure you have the right key in your board file + ```CONFIG_PUBLIC_KEY1="../../../keys/public/public_key.pub"``` + +3. **Build Firmware**: + ```bash + cd PX4-Autopilot + + AND + + make _encrypted_logs + + FOR INSTANCE + make_ark_fpv_encrypted_logs + + Upload the custom firmware on your flight controller and record some logs + ``` + +4. **Download Logs**: + ```bash + cd PX4-Autopilot/Tools/log_encryption + + python3 download_logs.py /dev/ttyACM0 --baudrate 57600 + + OR + + python3 download_logs.py udp:0.0.0.0:14550 + ``` + + Addresses might need to be adjusted + +5. **Decrypt Logs**: + The easiest way to run this is to have your private key and encrypted logs in the following folders respectively: + ```bash + PX4-Autopilot/keys/private + PX4-Autopilot/logs/encrypted + ``` + Then run: + ```bash + cd PX4-Autopilot/Tools/log_encryption + + AND + # Uses default key + default folder + python3 decrypt_logs.py + + OR + # Use --help to get all the options + python3 decrypt_logs.py --help + ``` + + Your decrypted logs can be found in: + ```bash + PX4-Autopilot/logs/decrypted + ``` + Otherwise + +## Directory Structure + +- **`keys/`**: Encryption keys. +- **`logs/encrypted/`**: Downloaded encrypted logs. +- **`logs/decrypted/`**: Decrypted logs. diff --git a/Tools/log_encryption/decrypt_logs.py b/Tools/log_encryption/decrypt_logs.py new file mode 100644 index 0000000000..448fcc9d1b --- /dev/null +++ b/Tools/log_encryption/decrypt_logs.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +import os +import sys +import argparse +from pathlib import Path + +try: + from Crypto.Cipher import ChaCha20 + from Crypto.PublicKey import RSA + from Crypto.Cipher import PKCS1_OAEP + from Crypto.Hash import SHA256 +except ImportError as e: + print("Failed to import crypto: " + str(e)) + print("You may need to install it using:") + print(" pip3 install --user pycryptodome") + sys.exit(1) + +PX4_MAIN_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) +ENCRYPTED_LOGS_DIR = os.path.join(PX4_MAIN_DIR, "logs/encrypted") +DECRYPTED_LOGS_DIR = os.path.join(PX4_MAIN_DIR, "logs/decrypted") +DEFAULT_PRIVATE_KEY = os.path.join(PX4_MAIN_DIR, "keys/private/private_key.pem") + +def decrypt_log_file(ulog_file, private_key, output_folder): + """Decrypts a single encrypted log file (.ulge) and saves it as .ulg in the output folder.""" + + try: + # Read the private RSA key + with open(private_key, 'rb') as f: + key = RSA.import_key(f.read()) + + magic = "ULogEnc" + header_size = 22 + + with open(ulog_file, 'rb') as f: + # Encrypted .ulge file contains following sections: + # ------------------------- + # | Header | + # ------------------------- + # | Wrapped symmetric key | + # ------------------------- + # | Encrypted ulog data | + # ------------------------- + header = f.read(header_size) + + # Parse the header + if not header.startswith(bytearray(magic.encode())): + print(f"Skipping {ulog_file}: Incorrect header magic") + return + if header[7] != 1: + print(f"Skipping {ulog_file}: Unsupported header version") + return + if header[16] != 4: + print(f"Skipping {ulog_file}: Unsupported key algorithm") + return + + key_size = header[19] << 8 | header[18] + nonce_size = header[21] << 8 | header[20] + cipher = f.read(key_size) + nonce = f.read(nonce_size) + + data_offset = header_size + key_size + nonce_size + + # Try to decrypt the ChaCha key + cipher_rsa = PKCS1_OAEP.new(key, SHA256) + try: + ulog_key = cipher_rsa.decrypt(cipher) + except ValueError: + print(f"Skipping {ulog_file}: Incorrect decryption (wrong key)") + return + + # Read and decrypt the log data + cipher = ChaCha20.new(key=ulog_key, nonce=nonce) + + # Save decrypted log with .ulg extension + output_path = os.path.join(output_folder, Path(ulog_file).stem + ".ulg") + with open(ulog_file, 'rb') as f: + if data_offset > 0: + f.seek(data_offset) + with open(output_path, 'wb') as out: + out.write(cipher.decrypt(f.read())) + + print(f"{output_path}") + + except Exception as e: + print(f"Skipping {ulog_file}: Error occurred - {e}") + + +def decrypt_all_logs(private_key_path, log_source_path=None): + """Decrypts all logs in the given folder or a single file.""" + + if log_source_path and os.path.isfile(log_source_path): + logs = [log_source_path] + else: + # Use default encrypted logs directory if not provided + folder = log_source_path if log_source_path else ENCRYPTED_LOGS_DIR + logs = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".ulge")] + + if not logs: + print("No encrypted logs found.") + return + + print(f"Found {len(logs)} encrypted log(s). Decrypting...") + + os.makedirs(DECRYPTED_LOGS_DIR, exist_ok=True) + + for log_path in logs: + decrypt_log_file(log_path, private_key_path, DECRYPTED_LOGS_DIR) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Decrypt PX4 encrypted log files (.ulge) using a ChaCha20+RSA scheme.\n\n" + "Usage examples:\n" + " python3 decrypt_logs.py /path/to/private_key.pem /path/to/custom_log.ulge\n" + " python3 decrypt_logs.py /path/to/private_key.pem /path/to/folder_with_ulge_files\n" + " python3 decrypt_logs.py # Uses default key + default log folder\n", + formatter_class=argparse.RawTextHelpFormatter + ) + + parser.add_argument("private_key", nargs="?", default=None, + help="Path to the private RSA key (.pem). If omitted, uses default key.") + parser.add_argument("log_file_or_folder", nargs="?", default=None, + help="Path to a single .ulge file or folder containing them. If omitted, uses default encrypted log folder.") + + + args = parser.parse_args() + + private_key_path = args.private_key if args.private_key else DEFAULT_PRIVATE_KEY + + if not os.path.exists(private_key_path): + print(f"Error: Private key file not found at {private_key_path}") + sys.exit(1) + + decrypt_all_logs(private_key_path, args.log_file_or_folder) diff --git a/Tools/log_encryption/download_logs.py b/Tools/log_encryption/download_logs.py new file mode 100644 index 0000000000..18423f804d --- /dev/null +++ b/Tools/log_encryption/download_logs.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 + +import os +import sys +import time +import shutil +import threading +from pymavlink import mavutil +from argparse import ArgumentParser + +class MavlinkLogDownloader: + def __init__(self, connection_url, output_dir, baudrate=57600, source_system=254): + self.connection_url = connection_url + self.output_dir = output_dir + self.encrypted_dir = os.path.join(output_dir, "encrypted") + self.running = True + + # Ensure directories exist + os.makedirs(self.output_dir, exist_ok=True) + os.makedirs(self.encrypted_dir, exist_ok=True) + + # # Handle serial or UDP connections + if os.path.exists(connection_url): # likely a serial device + self.mav = mavutil.mavlink_connection(connection_url, baud=baudrate, source_system=source_system) + else: + self.mav = mavutil.mavlink_connection(connection_url, source_system=source_system) + + + self.mav.WIRE_PROTOCOL_VERSION = "2.0" + + # Start heartbeat thread + self.heartbeat_thread = threading.Thread(target=self.send_heartbeat_thread) + self.heartbeat_thread.daemon = True + self.heartbeat_thread.start() + + self.mav.wait_heartbeat() + print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}") + + # Waking up the autopilot, it is needed to ensure we get answer for log request + self.mav.mav.command_long_send( + self.mav.target_system, + self.mav.target_component, + mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE, # Command ID 512 + 0, # Confirmation + mavutil.mavlink.MAVLINK_MSG_ID_AUTOPILOT_VERSION, # param1: Message ID 148 + 0, 0, 0, 0, 0, 0 # params 2–7 are not used for this message + ) + + # Allow heartbeats to establish connection + time.sleep(3) + + + def send_heartbeat_thread(self): + while self.running: + self.mav.mav.heartbeat_send( + mavutil.mavlink.MAV_TYPE_GCS, + mavutil.mavlink.MAV_AUTOPILOT_GENERIC, + 0, 0, 0 + ) + time.sleep(1) + + + def download_logs(self): + """Downloads logs to the output_dir.""" + print("Request logs...") + self.mav.mav.log_request_list_send(self.mav.target_system, self.mav.target_component, 0, 0xFFFF) + + log_entries = {} + total_logs = None + start_time = time.time() + last_entry_time = None + + while True: + current_time = time.time() + + # Case 1: If we haven't received any entries yet and we've waited more than 5 seconds + if not log_entries and current_time > start_time + 5: + print("Timed out waiting for first log entry (5s)") + break + + # Case 2: If we have received at least one entry and it's been more than 3 seconds since the last one + if last_entry_time and current_time - last_entry_time > 3: + print(f"No new log entries received for 3 seconds. Moving on.") + break + + # Case 3: If we've received all expected logs + if total_logs is not None and len(log_entries) >= total_logs: + print(f"Received all {total_logs} log entries.") + break + + msg = self.mav.recv_match(type='LOG_ENTRY', blocking=True, timeout=1) + + if msg and msg.id not in log_entries: + last_entry_time = time.time() + log_entries[msg.id] = msg + + if total_logs is None: + total_logs = msg.num_logs + + print(f"Log ID: {msg.id}, Size: {msg.size} bytes, Date: {msg.time_utc} ({len(log_entries)}/{total_logs})") + + if not log_entries: + print("No log entries found.") + return + + for entry in log_entries.values(): + self.download_log_file(entry) + + self.classify_logs() + + + def download_log_file(self, log_entry): + """Downloads a log file to the output_dir.""" + log_id = log_entry.id + log_size = log_entry.size + log_date = time.strftime("%Y-%m-%d_%H-%M-%S", time.gmtime(log_entry.time_utc)) + output_filename = os.path.join(self.output_dir, f"log-{log_date}_{log_id}.ulg") + + print(f"Downloading log {log_id} ({log_size} bytes) to {output_filename}...") + + with open(output_filename, 'wb') as f: + self.mav.mav.log_request_data_send(self.mav.target_system, self.mav.target_component, log_id, 0, 0xFFFFFFFF) + + bytes_received = 0 + while bytes_received < log_size: + msg = self.mav.recv_match(type='LOG_DATA', blocking=True, timeout=5) + if msg: + data_bytes = bytes(msg.data[:msg.count]) + f.write(data_bytes) + bytes_received += msg.count + else: + print("Timeout waiting for log data.") + break + + print(f"Finished downloading log {log_id}.") + + def classify_logs(self): + """Classifies logs as encrypted (.ulge) based on file content.""" + for log_file in os.listdir(self.output_dir): + log_path = os.path.join(self.output_dir, log_file) + + if not os.path.isfile(log_path): + continue + + # Read first 10 bytes to check for "ULogEnc" + with open(log_path, 'rb') as f: + first_bytes = f.read(10) + + if b'ULogEnc' in first_bytes: + new_filename = log_file.replace(".ulg", ".ulge") + new_path = os.path.join(self.encrypted_dir, new_filename) + print(f"Found encrypted log: {new_path}") + shutil.move(log_path, new_path) + + + def cleanup(self): + """Stop the heartbeat thread and clean up resources.""" + self.running = False + if self.heartbeat_thread.is_alive(): + self.heartbeat_thread.join(timeout=2.0) + + +def main(): + parser = ArgumentParser(description="Download PX4 log files over MAVLink.") + parser.add_argument('connection_url', help="MAVLink connection URL (e.g., udp:0.0.0.0:14550, /dev/ttyACM0 --baudrate 57600)") + parser.add_argument('--output', '-o', default=os.path.join(os.path.dirname(__file__), "../..", "logs"), help="Output directory for log files (default: ../../logs)") + parser.add_argument('--baudrate', type=int, default=57600, help="Baudrate for serial connection (default: 57600)") + parser.add_argument('--source-system', type=int, default=254, help="MAVLink source system ID (default: 254)") + + + args = parser.parse_args() + + output_dir = os.path.abspath(args.output) + + print(f"Connecting to {args.connection_url}...") + log_downloader = MavlinkLogDownloader( + args.connection_url, + output_dir, + baudrate=args.baudrate, + source_system=args.source_system + ) + + + try: + log_downloader.download_logs() + finally: + log_downloader.cleanup() + +if __name__ == '__main__': + main() diff --git a/Tools/log_encryption/generate_keys.py b/Tools/log_encryption/generate_keys.py new file mode 100644 index 0000000000..5847de9351 --- /dev/null +++ b/Tools/log_encryption/generate_keys.py @@ -0,0 +1,59 @@ +import os +import subprocess + +# Define the main PX4 directory (one level up from Tools) +PX4_MAIN_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) + +# Define the key folder paths +KEY_FOLDER = os.path.join(PX4_MAIN_DIR, "keys") +PUBLIC_KEY_FOLDER = os.path.join(KEY_FOLDER, "public") +PRIVATE_KEY_FOLDER = os.path.join(KEY_FOLDER, "private") + +# Define the key file paths +PRIVATE_KEY_PATH = os.path.join(PRIVATE_KEY_FOLDER, "private_key.pem") +PUBLIC_KEY_DER_PATH = os.path.join(PUBLIC_KEY_FOLDER, "public_key.der") +PUBLIC_KEY_PUB_PATH = os.path.join(PUBLIC_KEY_FOLDER, "public_key.pub") + +def create_key_folders(): + """Creates key, public, and private folders if they do not exist.""" + for folder in [KEY_FOLDER, PUBLIC_KEY_FOLDER, PRIVATE_KEY_FOLDER]: + if not os.path.exists(folder): + os.makedirs(folder) + print(f"Created '{folder}' directory.") + else: + print(f"'{folder}' directory already exists.") + +def generate_private_key(): + """Generates a private key if it does not exist.""" + if not os.path.exists(PRIVATE_KEY_PATH): + print("Generating private key...") + subprocess.run(["openssl", "genpkey", "-algorithm", "RSA", "-out", PRIVATE_KEY_PATH, "-pkeyopt", "rsa_keygen_bits:2048"]) + print(f"Private key generated at: {PRIVATE_KEY_PATH}") + else: + print("Private key already exists.") + +def generate_public_key(): + """Generates a public key in DER and PUB formats if they do not exist.""" + if not os.path.exists(PUBLIC_KEY_DER_PATH): + print("Generating public key in DER format...") + subprocess.run(["openssl", "rsa", "-pubout", "-in", PRIVATE_KEY_PATH, "-outform", "DER", "-out", PUBLIC_KEY_DER_PATH]) + print(f"Public key (DER) generated at: {PUBLIC_KEY_DER_PATH}") + else: + print("Public key (DER) already exists.") + + if not os.path.exists(PUBLIC_KEY_PUB_PATH): + print("Generating public key in hex format...") + with open(PUBLIC_KEY_PUB_PATH, "w") as pub_file: + process = subprocess.Popen(["xxd", "-p", PUBLIC_KEY_DER_PATH], stdout=subprocess.PIPE) + output, _ = process.communicate() + hex_string = output.decode().strip().replace("\n", "") + formatted_hex = ", ".join(f"0x{hex_string[i:i+2]}" for i in range(0, len(hex_string), 2)) + pub_file.write(formatted_hex) + print(f"Public key (hex) generated at: {PUBLIC_KEY_PUB_PATH}") + else: + print("Public key (hex) already exists.") + +if __name__ == "__main__": + create_key_folders() + generate_private_key() + generate_public_key() diff --git a/Tools/setup/requirements.txt b/Tools/setup/requirements.txt index 51108b7c71..209c9a4473 100644 --- a/Tools/setup/requirements.txt +++ b/Tools/setup/requirements.txt @@ -26,3 +26,4 @@ setuptools>=39.2.0 six>=1.12.0 toml>=0.9 sympy>=1.10.1 +pycryptodome diff --git a/boards/ark/fmu-v6x/default.px4board b/boards/ark/fmu-v6x/default.px4board index 0d4de23c2f..0ec7b49c42 100644 --- a/boards/ark/fmu-v6x/default.px4board +++ b/boards/ark/fmu-v6x/default.px4board @@ -25,7 +25,16 @@ CONFIG_DRIVERS_IMU_INVENSENSE_IIM42652=y CONFIG_DRIVERS_IMU_INVENSENSE_IIM42653=y CONFIG_DRIVERS_IMU_MURATA_SCH16T=y CONFIG_COMMON_LIGHT=y -CONFIG_COMMON_MAGNETOMETER=y +CONFIG_DRIVERS_MAGNETOMETER_BOSCH_BMM150=y +CONFIG_DRIVERS_MAGNETOMETER_HMC5883=y +CONFIG_DRIVERS_MAGNETOMETER_QMC5883L=y +CONFIG_DRIVERS_MAGNETOMETER_ISENTEK_IST8308=y +CONFIG_DRIVERS_MAGNETOMETER_ISENTEK_IST8310=y +CONFIG_DRIVERS_MAGNETOMETER_LIS3MDL=y +CONFIG_DRIVERS_MAGNETOMETER_LSM303AGR=y +CONFIG_DRIVERS_MAGNETOMETER_RM3100=y +CONFIG_DRIVERS_MAGNETOMETER_MEMSIC_MMC5983MA=y +CONFIG_DRIVERS_MAGNETOMETER_ST_IIS2MDC=y CONFIG_DRIVERS_POWER_MONITOR_INA226=y CONFIG_DRIVERS_POWER_MONITOR_INA228=y CONFIG_DRIVERS_POWER_MONITOR_INA238=y diff --git a/boards/ark/fmu-v6x/encrypted_logs.px4board b/boards/ark/fmu-v6x/encrypted_logs.px4board new file mode 100644 index 0000000000..87707c5e90 --- /dev/null +++ b/boards/ark/fmu-v6x/encrypted_logs.px4board @@ -0,0 +1,8 @@ +CONFIG_DRIVERS_BATT_SMBUS=n +CONFIG_DRIVERS_PX4IO=n +CONFIG_BOARD_CRYPTO=y +CONFIG_DRIVERS_STUB_KEYSTORE=y +CONFIG_DRIVERS_SW_CRYPTO=y +# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set +CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub" +CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub" diff --git a/boards/ark/fmu-v6x/nuttx-config/nsh/defconfig b/boards/ark/fmu-v6x/nuttx-config/nsh/defconfig index 94fbcc11b5..f949d32dc4 100644 --- a/boards/ark/fmu-v6x/nuttx-config/nsh/defconfig +++ b/boards/ark/fmu-v6x/nuttx-config/nsh/defconfig @@ -84,6 +84,8 @@ CONFIG_CDCACM_RXBUFSIZE=600 CONFIG_CDCACM_TXBUFSIZE=12000 CONFIG_CDCACM_VENDORID=0x3185 CONFIG_CDCACM_VENDORSTR="ARK" +CONFIG_CRYPTO=y +CONFIG_CRYPTO_RANDOM_POOL=y CONFIG_DEBUG_FULLOPT=y CONFIG_DEBUG_HARDFAULT_ALERT=y CONFIG_DEBUG_MEMFAULT=y diff --git a/boards/ark/fpv/encrypted_logs.px4board b/boards/ark/fpv/encrypted_logs.px4board new file mode 100644 index 0000000000..9b24cdbaf2 --- /dev/null +++ b/boards/ark/fpv/encrypted_logs.px4board @@ -0,0 +1,7 @@ + +CONFIG_BOARD_CRYPTO=y +CONFIG_DRIVERS_STUB_KEYSTORE=y +CONFIG_DRIVERS_SW_CRYPTO=y +# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set +CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub" +CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub" diff --git a/boards/ark/fpv/nuttx-config/nsh/defconfig b/boards/ark/fpv/nuttx-config/nsh/defconfig index f43478025c..bf82ccc0b1 100644 --- a/boards/ark/fpv/nuttx-config/nsh/defconfig +++ b/boards/ark/fpv/nuttx-config/nsh/defconfig @@ -82,6 +82,8 @@ CONFIG_CDCACM_RXBUFSIZE=600 CONFIG_CDCACM_TXBUFSIZE=12000 CONFIG_CDCACM_VENDORID=0x3185 CONFIG_CDCACM_VENDORSTR="ARK" +CONFIG_CRYPTO=y +CONFIG_CRYPTO_RANDOM_POOL=y CONFIG_DEBUG_FULLOPT=y CONFIG_DEBUG_HARDFAULT_ALERT=y CONFIG_DEBUG_MEMFAULT=y diff --git a/boards/ark/pi6x/encrypted_logs.px4board b/boards/ark/pi6x/encrypted_logs.px4board new file mode 100644 index 0000000000..22d01ff7f2 --- /dev/null +++ b/boards/ark/pi6x/encrypted_logs.px4board @@ -0,0 +1,6 @@ +CONFIG_BOARD_CRYPTO=y +CONFIG_DRIVERS_STUB_KEYSTORE=y +CONFIG_DRIVERS_SW_CRYPTO=y +# CONFIG_EKF2_AUX_GLOBAL_POSITION is not set +CONFIG_PUBLIC_KEY0="../../../Tools/test_keys/key0.pub" +CONFIG_PUBLIC_KEY1="../../../Tools/test_keys/rsa2048.pub" diff --git a/boards/ark/pi6x/nuttx-config/nsh/defconfig b/boards/ark/pi6x/nuttx-config/nsh/defconfig index 57ff449302..84ca5b10f9 100644 --- a/boards/ark/pi6x/nuttx-config/nsh/defconfig +++ b/boards/ark/pi6x/nuttx-config/nsh/defconfig @@ -82,6 +82,8 @@ CONFIG_CDCACM_RXBUFSIZE=600 CONFIG_CDCACM_TXBUFSIZE=12000 CONFIG_CDCACM_VENDORID=0x3185 CONFIG_CDCACM_VENDORSTR="ARK" +CONFIG_CRYPTO=y +CONFIG_CRYPTO_RANDOM_POOL=y CONFIG_DEBUG_FULLOPT=y CONFIG_DEBUG_HARDFAULT_ALERT=y CONFIG_DEBUG_MEMFAULT=y