diff --git a/Tools/simulation/collect_swarm_ulogs.py b/Tools/simulation/collect_swarm_ulogs.py new file mode 100755 index 0000000000..d716a4d900 --- /dev/null +++ b/Tools/simulation/collect_swarm_ulogs.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +"""Collect ULog files from SIH swarm simulation instances. + +Gathers .ulg files from per-instance log directories and copies them +into a single output folder with filenames that identify the source instance. +""" + +import argparse +import glob +import os +import shutil +import sys +from datetime import datetime +from pathlib import Path + + +def find_repo_root() -> Path: + """Walk up from script location to find the repository root.""" + path = Path(__file__).resolve().parent + while path != path.parent: + if (path / "ROMFS").is_dir() and (path / "src").is_dir(): + return path + path = path.parent + return Path(__file__).resolve().parent.parent.parent + + +def parse_instance_number(filepath: str) -> int | None: + """Extract the instance number from a path containing instance_N.""" + parts = Path(filepath).parts + for part in parts: + if part.startswith("instance_"): + try: + return int(part.split("_", 1)[1]) + except (ValueError, IndexError): + continue + return None + + +def collect_ulogs(build_dir: Path, output_dir: Path, max_instances: int | None) -> None: + """Find and copy ULog files from swarm instance directories.""" + pattern = str(build_dir / "instance_*" / "log" / "**" / "*.ulg") + ulog_files = sorted(glob.glob(pattern, recursive=True)) + + if not ulog_files: + print(f"No ULog files found matching: {pattern}") + print() + print("Troubleshooting:") + print(f" - Verify build directory exists: {build_dir}") + print(f" - Check for instance_* subdirectories in {build_dir}") + print(" - Ensure the swarm simulation has run and produced logs") + sys.exit(1) + + # Filter by instance count if specified. + if max_instances is not None: + ulog_files = [ + f for f in ulog_files + if (n := parse_instance_number(f)) is not None and n < max_instances + ] + + if not ulog_files: + assert max_instances is not None + print(f"No ULog files found for the requested instance range (0..{max_instances - 1}).") + sys.exit(1) + + output_dir.mkdir(parents=True, exist_ok=True) + + total_size = 0 + instances_seen: set[int] = set() + copied = 0 + + for src in ulog_files: + instance = parse_instance_number(src) + if instance is None: + print(f" skipping (could not determine instance): {src}") + continue + + instances_seen.add(instance) + original_name = Path(src).name + dest_name = f"drone_{instance:02d}_{original_name}" + dest_path = output_dir / dest_name + + shutil.copy2(src, dest_path) + file_size = os.path.getsize(src) + total_size += file_size + copied += 1 + print(f" [{instance:02d}] {original_name} ({file_size / 1024:.1f} KB)") + + print() + print("Summary") + print("-" * 40) + print(f" Files collected : {copied}") + print(f" Instances : {sorted(instances_seen)}") + print(f" Total size : {total_size / (1024 * 1024):.2f} MB") + print(f" Output directory: {output_dir.resolve()}") + + +def main() -> None: + repo_root = find_repo_root() + default_build = str(repo_root / "build" / "px4_sitl_sih") + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + parser = argparse.ArgumentParser( + description="Collect ULog files from SIH swarm simulation instances.", + ) + parser.add_argument( + "--build-dir", + default=default_build, + help=f"Path to the build directory (default: {default_build})", + ) + parser.add_argument( + "--output-dir", + default=None, + help="Output directory (default: swarm_ulogs_YYYYMMDD_HHMMSS)", + ) + parser.add_argument( + "--instances", + type=int, + default=None, + help="Number of instances to collect from (default: auto-detect)", + ) + + args = parser.parse_args() + + build_dir = Path(args.build_dir) + if not build_dir.is_dir(): + print(f"Error: build directory does not exist: {build_dir}") + sys.exit(1) + + if args.output_dir is not None: + output_dir = Path(args.output_dir) + else: + output_dir = Path(f"swarm_ulogs_{timestamp}") + + print(f"Collecting ULog files from: {build_dir}") + print(f"Output directory: {output_dir}") + print() + + collect_ulogs(build_dir, output_dir, args.instances) + + +if __name__ == "__main__": + main() diff --git a/Tools/simulation/generate_swarm_ulogs.py b/Tools/simulation/generate_swarm_ulogs.py new file mode 100755 index 0000000000..efaf513c40 --- /dev/null +++ b/Tools/simulation/generate_swarm_ulogs.py @@ -0,0 +1,487 @@ +#!/usr/bin/env python3 +"""Generate synthetic ULog files for testing mavsim-viewer swarm replay. + +Creates N drone ULog files with crafted trajectories in valid ULog binary +format, without requiring any real PX4 simulation. +""" + +import argparse +import json +import math +import os +import struct + + +# --------------------------------------------------------------------------- +# ULog binary writer +# --------------------------------------------------------------------------- + +ULOG_MAGIC = b'\x55\x4c\x6f\x67\x01\x12\x35' +ULOG_VERSION = 1 + + +class ULogWriter: + """Write a minimal but valid ULog binary file.""" + + def __init__(self, path: str, timestamp_us: int = 0): + self._path = path + self._fp = open(path, 'wb') + self._msg_id_counter = 0 + self._subscriptions: dict[str, int] = {} # topic_name -> msg_id + self._write_header(timestamp_us) + self._write_flag_bits() + + # -- low-level helpers -------------------------------------------------- + + def _write_raw(self, data: bytes): + self._fp.write(data) + + def _write_message(self, msg_type: int, payload: bytes): + """Write a ULog message: uint16 size + uint8 type + payload.""" + self._write_raw(struct.pack(' int: + """Write an AddSubscription ('A') message. Returns the assigned msg_id.""" + msg_id = self._msg_id_counter + self._msg_id_counter += 1 + self._subscriptions[topic_name] = msg_id + name_bytes = topic_name.encode('ascii') + payload = struct.pack(' bytes: + return struct.pack(' bytes: + return struct.pack(' bytes: + return struct.pack(' bytes: + return struct.pack(' tuple[float, float, float, float]: + return (math.cos(yaw / 2.0), 0.0, 0.0, math.sin(yaw / 2.0)) + + +def smooth_interp(t: float) -> float: + """Cosine interpolation factor 0..1.""" + return 0.5 - 0.5 * math.cos(math.pi * max(0.0, min(1.0, t))) + + +def grid_position(drone_id: int) -> tuple[float, float]: + """4x4 grid, 3m spacing, centered on origin.""" + row = drone_id // 4 + col = drone_id % 4 + x = (row - 1.5) * 3.0 + y = (col - 1.5) * 3.0 + return x, y + + +def circle_position(drone_id: int, num_drones: int, radius: float = 30.0) -> tuple[float, float]: + angle = 2.0 * math.pi * drone_id / num_drones + return radius * math.cos(angle), radius * math.sin(angle) + + +def local_to_global(x: float, y: float, z: float) -> tuple[float, float, float]: + lat = REF_LAT + (x / 111320.0) + lon = REF_LON + (y / (111320.0 * math.cos(math.radians(REF_LAT)))) + alt = REF_ALT - z + return lat, lon, alt + + +# --------------------------------------------------------------------------- +# Phase computations +# --------------------------------------------------------------------------- + +TARGET = (200.0, 0.0, 0.0) # NED target for kamikaze + + +def compute_state(drone_id: int, t: float, num_drones: int, _duration: float): + """Return (x, y, z, vx, vy, vz, yaw, nav_state) for a drone at time t. + + Returns None if the drone hasn't started yet or has impacted. + """ + start_time = drone_id * 0.5 # staggered start + if t < start_time: + return None + + # Failure drone 12 -- crash at t=70, log ends + if drone_id == 12 and t > 70.0: + return None + + # Default nav state + nav_state = NAV_OFFBOARD + yaw = 0.0 + vx, vy, vz = 0.0, 0.0, 0.0 + + gx, gy = grid_position(drone_id) + cx, cy = circle_position(drone_id, num_drones) + + if t < 10.0: + # Phase 1: staggered start, on ground rising slightly + frac = smooth_interp((t - start_time) / max(0.01, 10.0 - start_time)) + x, y = gx * frac, gy * frac + z = 0.0 + nav_state = NAV_TAKEOFF + + elif t < 25.0: + # Phase 2: grid takeoff, ascend to -20 NED + frac = smooth_interp((t - 10.0) / 15.0) + x, y = gx, gy + z = -20.0 * frac + vz = -20.0 / 15.0 if frac < 1.0 else 0.0 + nav_state = NAV_TAKEOFF + + elif t < 40.0: + # Phase 3: grid -> ring morph + frac = smooth_interp((t - 25.0) / 15.0) + x = gx + (cx - gx) * frac + y = gy + (cy - gy) * frac + z = -20.0 + vx = (cx - gx) / 15.0 + vy = (cy - gy) / 15.0 + nav_state = NAV_OFFBOARD + + elif t < 55.0: + # Phase 4: ring cruise CW at 3 m/s + angular_speed = 3.0 / 30.0 # omega = v/r + base_angle = 2.0 * math.pi * drone_id / num_drones + angle = base_angle - angular_speed * (t - 40.0) # CW -> negative + x = 30.0 * math.cos(angle) + y = 30.0 * math.sin(angle) + z = -20.0 + vx = 30.0 * angular_speed * math.sin(angle) + vy = -30.0 * angular_speed * math.cos(angle) + yaw = math.atan2(-y, -x) # face center + nav_state = NAV_OFFBOARD + + else: + # Phase 5/6: Kamikaze dive toward target + # Starting position at t=55 + base_angle = 2.0 * math.pi * drone_id / num_drones + angular_speed = 3.0 / 30.0 + angle55 = base_angle - angular_speed * 15.0 + start_x = 30.0 * math.cos(angle55) + start_y = 30.0 * math.sin(angle55) + start_z = -20.0 + + dx = TARGET[0] - start_x + dy = TARGET[1] - start_y + dz = TARGET[2] - start_z + dist = math.sqrt(dx * dx + dy * dy + dz * dz) + speed = 12.0 # m/s max + nx, ny, nz = dx / dist, dy / dist, dz / dist + + dt = t - 55.0 + x = start_x + nx * speed * dt + y = start_y + ny * speed * dt + z = start_z + nz * speed * dt + vx = nx * speed + vy = ny * speed + vz = nz * speed + yaw = math.atan2(ny, nx) + nav_state = NAV_LAND + + # Impact: close to target or above ground + cur_dist = math.sqrt((TARGET[0] - x) ** 2 + (TARGET[1] - y) ** 2 + (TARGET[2] - z) ** 2) + if cur_dist < 2.0 or z > -1.0: + return None + + # Drone 12 failure behavior (attitude wobble from t=60) + if drone_id == 12 and t >= 60.0: + wobble_t = t - 60.0 + wobble_amp = min(wobble_t * 0.1, 0.8) # growing oscillation + roll = wobble_amp * math.sin(wobble_t * 5.0) + pitch = wobble_amp * math.cos(wobble_t * 4.0) + # drift sideways + x += wobble_t * 2.0 + y += wobble_t * 1.5 + z += wobble_t * 0.5 # descending + # Build quaternion with roll/pitch/yaw + cr, sr = math.cos(roll / 2), math.sin(roll / 2) + cp, sp = math.cos(pitch / 2), math.sin(pitch / 2) + cy, sy = math.cos(yaw / 2), math.sin(yaw / 2) + q = ( + cr * cp * cy + sr * sp * sy, + sr * cp * cy - cr * sp * sy, + cr * sp * cy + sr * cp * sy, + cr * cp * sy - sr * sp * cy, + ) + return (x, y, z, vx, vy, vz, q, nav_state) + + return (x, y, z, vx, vy, vz, quat_from_yaw(yaw), nav_state) + + +# --------------------------------------------------------------------------- +# Generate a single drone ULog +# --------------------------------------------------------------------------- + +def generate_drone_ulog(drone_id: int, num_drones: int, duration: float, + output_dir: str, scenario: str): + filename = os.path.join(output_dir, f"drone_{drone_id:02d}.ulg") + start_us = drone_id * 500_000 # stagger by 0.5s in timestamps + + writer = ULogWriter(filename, timestamp_us=start_us) + + # Write format definitions + writer.write_format(FMT_LOCAL_POS) + writer.write_format(FMT_ATTITUDE) + writer.write_format(FMT_GLOBAL_POS) + writer.write_format(FMT_STATUS) + + # Write info messages + writer.write_info("sys_name", "PX4") + writer.write_info("ver_hw", "SITL") + writer.write_info("scenario", scenario) + writer.write_info("drone_id", str(drone_id)) + + # Add subscriptions + writer.add_subscription("vehicle_local_position") + writer.add_subscription("vehicle_attitude") + writer.add_subscription("vehicle_global_position") + writer.add_subscription("vehicle_status") + + # Determine rates per drone (edge cases) + pos_dt = 0.02 # 50 Hz + att_dt = 0.01 # 100 Hz + if drone_id == 13: + pos_dt = 0.1 # 10 Hz low rate + if drone_id == 14: + att_dt = 0.005 # 200 Hz high rate + + # Generate data + # We iterate at the finest resolution needed and emit when due + fine_dt = min(pos_dt, att_dt, 0.001) + # Use the finest rate needed for this drone + fine_dt = min(pos_dt, att_dt) + + status_interval = 1.0 # 1 Hz + next_pos_t = 0.0 + next_att_t = 0.0 + next_status_t = 0.0 + last_nav_state = -1 + + t = 0.0 + samples_written = 0 + while t <= duration: + state = compute_state(drone_id, t, num_drones, duration) + + # Drone 15 data gap at t=50-52 + in_gap = (drone_id == 15 and 50.0 <= t <= 52.0) + + if state is not None and not in_gap: + x, y, z, vx, vy, vz, q, nav_state = state + ts = int(t * 1_000_000) + + # Position data + if t >= next_pos_t: + writer.write_data("vehicle_local_position", + pack_vehicle_local_position( + ts, x, y, z, vx, vy, vz, + REF_LAT, REF_LON, REF_ALT)) + lat, lon, alt = local_to_global(x, y, z) + writer.write_data("vehicle_global_position", + pack_vehicle_global_position(ts, lat, lon, alt)) + next_pos_t = t + pos_dt + samples_written += 2 + + # Attitude data + if t >= next_att_t: + writer.write_data("vehicle_attitude", + pack_vehicle_attitude(ts, q)) + next_att_t = t + att_dt + samples_written += 1 + + # Status data + if t >= next_status_t or nav_state != last_nav_state: + writer.write_data("vehicle_status", + pack_vehicle_status(ts, 2, nav_state)) + next_status_t = t + status_interval + last_nav_state = nav_state + samples_written += 1 + + t += fine_dt + + writer.close() + return samples_written + + +# --------------------------------------------------------------------------- +# Scenario metadata +# --------------------------------------------------------------------------- + +def write_scenario_json(output_dir: str, scenario: str, num_drones: int, duration: float): + meta = { + "scenario": scenario, + "num_drones": num_drones, + "duration_s": duration, + "origin": { + "lat": REF_LAT, + "lon": REF_LON, + "alt": REF_ALT, + }, + "phases": [ + {"name": "staggered_start", "start_s": 0, "end_s": 10}, + {"name": "grid_takeoff", "start_s": 10, "end_s": 25}, + {"name": "grid_to_ring", "start_s": 25, "end_s": 40}, + {"name": "ring_cruise", "start_s": 40, "end_s": 55}, + {"name": "kamikaze_dive", "start_s": 55, "end_s": 75}, + ], + "edge_cases": { + "drone_12": "failure at t=60, attitude oscillation, crash at t=70", + "drone_13": "low-rate position data (10Hz)", + "drone_14": "high-rate attitude data (200Hz)", + "drone_15": "2-second data gap at t=50-52", + }, + "files": [f"drone_{i:02d}.ulg" for i in range(num_drones)], + } + path = os.path.join(output_dir, "scenario.json") + with open(path, 'w') as f: + json.dump(meta, f, indent=2) + print(f" Wrote {path}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="Generate synthetic ULog files for swarm replay testing.") + parser.add_argument("--output-dir", default="synthetic_swarm_ulogs", + help="Output directory (default: synthetic_swarm_ulogs)") + parser.add_argument("--num-drones", type=int, default=16, + help="Number of drones (default: 16)") + parser.add_argument("--duration", type=float, default=90.0, + help="Scenario duration in seconds (default: 90)") + parser.add_argument("--scenario", default="hawk_descent", + help="Scenario name (default: hawk_descent)") + args = parser.parse_args() + + os.makedirs(args.output_dir, exist_ok=True) + print(f"Generating {args.num_drones} drone ULog files for scenario '{args.scenario}'") + print(f" Duration: {args.duration}s") + print(f" Output: {os.path.abspath(args.output_dir)}/") + print() + + for i in range(args.num_drones): + samples = generate_drone_ulog(i, args.num_drones, args.duration, + args.output_dir, args.scenario) + tag = "" + if i == 12: + tag = " [failure drone]" + elif i == 13: + tag = " [low-rate pos]" + elif i == 14: + tag = " [high-rate att]" + elif i == 15: + tag = " [data gap]" + print(f" drone_{i:02d}.ulg ({samples} messages){tag}") + + write_scenario_json(args.output_dir, args.scenario, args.num_drones, args.duration) + print() + print(f"Done. {args.num_drones} ULog files written to {os.path.abspath(args.output_dir)}/") + + +if __name__ == "__main__": + main() diff --git a/Tools/simulation/sih_swarm_run.sh b/Tools/simulation/sih_swarm_run.sh new file mode 100755 index 0000000000..c457516a0d --- /dev/null +++ b/Tools/simulation/sih_swarm_run.sh @@ -0,0 +1,141 @@ +#!/bin/bash +# Launch N SIH quadrotor instances with XRCE-DDS agent. +# Usage: ./sih_swarm_run.sh [CONFIG_YAML] [SPEED_FACTOR] +# +# Requires: build/px4_sitl_sih (make px4_sitl_sih sihsim_quadx_vision) + +set -euo pipefail + +ulimit -n 4096 + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +src_path="$SCRIPT_DIR/../../" +build_path="${src_path}/build/px4_sitl_sih" + +CONFIG_YAML="${1:-${SCRIPT_DIR}/swarm_formation.yaml}" +SPEED_FACTOR="${2:-1}" + +if [ ! -f "$CONFIG_YAML" ]; then + echo "ERROR: config not found: $CONFIG_YAML" + exit 1 +fi + +if [ ! -d "$build_path" ]; then + echo "ERROR: build directory not found: $build_path" + echo "Run: make px4_sitl_sih" + exit 1 +fi + +# --- Parse YAML config via Python --- +eval "$(python3 -c " +import yaml, sys +with open('${CONFIG_YAML}') as f: + cfg = yaml.safe_load(f) +origin = cfg['origin'] +print(f\"ORIGIN_LAT={origin['lat']}\") +print(f\"ORIGIN_LON={origin['lon']}\") +print(f\"ORIGIN_ALT={origin['alt']}\") +target = cfg['target'] +print(f\"TARGET_X={target['x']}\") +print(f\"TARGET_Y={target['y']}\") +print(f\"TARGET_Z={target['z']}\") +drones = cfg['formation'] +print(f\"DRONE_COUNT={len(drones)}\") +for i, d in enumerate(drones): + print(f\"DRONE_{i}_ID={d['id']}\") + print(f\"DRONE_{i}_X={d['x']}\") + print(f\"DRONE_{i}_Y={d['y']}\") + print(f\"DRONE_{i}_Z={d['z']}\") +")" + +echo "Swarm config: $DRONE_COUNT drones, origin ($ORIGIN_LAT, $ORIGIN_LON, ${ORIGIN_ALT}m)" +echo "Target: ($TARGET_X, $TARGET_Y, $TARGET_Z)" +echo "Speed factor: $SPEED_FACTOR" +echo "" + +# --- Cleanup --- +CHILD_PIDS=() +AGENT_PID="" + +cleanup() { + echo "" + echo "Shutting down..." + for pid in "${CHILD_PIDS[@]}"; do + kill "$pid" 2>/dev/null || true + done + if [ -n "$AGENT_PID" ]; then + kill "$AGENT_PID" 2>/dev/null || true + fi + pkill -x px4 2>/dev/null || true + wait 2>/dev/null + echo "All processes stopped." +} + +trap cleanup SIGINT SIGTERM + +# --- Kill existing instances --- +echo "Killing running instances..." +pkill -x px4 2>/dev/null || true +sleep 1 + +# --- Start XRCE-DDS agent (if available) --- +if command -v MicroXRCEAgent &>/dev/null; then + echo "Starting MicroXRCEAgent on UDP port 8888..." + MicroXRCEAgent udp4 -p 8888 > /dev/null 2>&1 & + AGENT_PID=$! + echo "XRCE-DDS agent PID: $AGENT_PID" +else + echo "WARNING: MicroXRCEAgent not found on host." + echo " Start it separately (e.g. from Docker container):" + echo " MicroXRCEAgent udp4 -p 8888" +fi +echo "" + +# --- Port map --- +printf "%-10s %-12s %-14s %-14s\n" "Instance" "MAV_SYS_ID" "DDS Namespace" "Offboard Port" +printf "%-10s %-12s %-14s %-14s\n" "--------" "----------" "-------------" "-------------" +n=0 +while [ $n -lt "$DRONE_COUNT" ]; do + sysid=$((n + 1)) + printf "%-10s %-12s %-14s %-14s\n" "$n" "$sysid" "px4_${n}" "$((14540 + n))" + n=$((n + 1)) +done +echo "" + +# --- Launch instances --- +export PX4_SIM_MODEL=sihsim_quadx_vision + +n=0 +while [ $n -lt "$DRONE_COUNT" ]; do + eval "FX=\$DRONE_${n}_X" + eval "FY=\$DRONE_${n}_Y" + eval "FZ=\$DRONE_${n}_Z" + + working_dir="$build_path/instance_$n" + [ ! -d "$working_dir" ] && mkdir -p "$working_dir" + + export PX4_HOME_LAT="$ORIGIN_LAT" + export PX4_HOME_LON="$ORIGIN_LON" + export PX4_HOME_ALT="$ORIGIN_ALT" + export PX4_FORMATION_X="$FX" + export PX4_FORMATION_Y="$FY" + export PX4_FORMATION_Z="$FZ" + export PX4_TARGET_X="$TARGET_X" + export PX4_TARGET_Y="$TARGET_Y" + export PX4_TARGET_Z="$TARGET_Z" + export PX4_SIM_SPEED_FACTOR="$SPEED_FACTOR" + # Force consistent DDS namespace for all instances (including 0) + export PX4_UXRCE_DDS_NS="px4_$n" + + pushd "$working_dir" &>/dev/null + echo "Starting instance $n (formation: $FX, $FY, $FZ)" + "$build_path/bin/px4" -i "$n" -d "$build_path/etc" >out.log 2>err.log & + CHILD_PIDS+=($!) + popd &>/dev/null + + n=$((n + 1)) +done + +echo "" +echo "All $DRONE_COUNT instances launched. Press Ctrl-C to stop." +wait diff --git a/Tools/simulation/swarm_formation.yaml b/Tools/simulation/swarm_formation.yaml new file mode 100644 index 0000000000..5f1901a2d1 --- /dev/null +++ b/Tools/simulation/swarm_formation.yaml @@ -0,0 +1,32 @@ +# Formation config for SIH swarm simulation +# Used by sih_swarm_run.sh and swarm_ros2 coordinator + +origin: + lat: 47.397742 + lon: 8.545594 + alt: 488.0 + +formation: + # NED offsets from origin (meters). 4x4 grid, 1m spacing, at 20m altitude + - {id: 0, x: 0.0, y: 0.0, z: -20.0} + - {id: 1, x: 0.0, y: 1.0, z: -20.0} + - {id: 2, x: 0.0, y: 2.0, z: -20.0} + - {id: 3, x: 0.0, y: 3.0, z: -20.0} + - {id: 4, x: 1.0, y: 0.0, z: -20.0} + - {id: 5, x: 1.0, y: 1.0, z: -20.0} + - {id: 6, x: 1.0, y: 2.0, z: -20.0} + - {id: 7, x: 1.0, y: 3.0, z: -20.0} + - {id: 8, x: 2.0, y: 0.0, z: -20.0} + - {id: 9, x: 2.0, y: 1.0, z: -20.0} + - {id: 10, x: 2.0, y: 2.0, z: -20.0} + - {id: 11, x: 2.0, y: 3.0, z: -20.0} + - {id: 12, x: 3.0, y: 0.0, z: -20.0} + - {id: 13, x: 3.0, y: 1.0, z: -20.0} + - {id: 14, x: 3.0, y: 2.0, z: -20.0} + - {id: 15, x: 3.0, y: 3.0, z: -20.0} + +target: + # Car position: 200m north of origin, ground level + x: 200.0 + y: 0.0 + z: 0.0 diff --git a/Tools/simulation/swarm_formation_designer.py b/Tools/simulation/swarm_formation_designer.py new file mode 100755 index 0000000000..8394ab43ff --- /dev/null +++ b/Tools/simulation/swarm_formation_designer.py @@ -0,0 +1,485 @@ +#!/usr/bin/env python3 +""" +Swarm Formation Designer - Create and preview formation YAML files for PX4 SIH swarm simulation. + +Generates swarm_formation.yaml files with preset formation patterns (grid, circle, line, +v, diamond, random) for use with sih_swarm_run.sh and swarm_ros2 coordinator. + +Usage examples: + # 16-drone grid with 2m spacing, preview in terminal + python3 swarm_formation_designer.py --pattern grid --count 16 --spacing 2.0 --preview + + # 8-drone circle, radius 10m, write to file + python3 swarm_formation_designer.py --pattern circle --count 8 --radius 10 -o formation.yaml + + # V formation with custom origin and target + python3 swarm_formation_designer.py --pattern v --count 7 --spacing 3.0 \\ + --origin-lat 37.4 --origin-lon -122.0 --target-north 150 --target-east 50 +""" + +import argparse +import math +import random +import sys +from typing import List, Tuple + +# Defaults matching PX4 SIH test site (Zurich area) +DEFAULT_LAT = 47.397742 +DEFAULT_LON = 8.545594 +DEFAULT_ALT = 488.0 +DEFAULT_ALTITUDE = 20.0 +DEFAULT_SPACING = 1.0 +DEFAULT_RADIUS = 5.0 +DEFAULT_COUNT = 16 +DEFAULT_TARGET_NORTH = 200.0 +DEFAULT_TARGET_EAST = 0.0 +MIN_SEPARATION = 0.5 # meters, minimum allowed distance between any two drones + + +def generate_grid(count: int, spacing: float, altitude: float) -> List[dict]: + """Arrange drones in a square grid pattern (ceil(sqrt(count)) columns).""" + cols = math.ceil(math.sqrt(count)) + positions = [] + for i in range(count): + row = i // cols + col = i % cols + positions.append({ + "id": i, + "x": round(row * spacing, 2), + "y": round(col * spacing, 2), + "z": round(-altitude, 2), + }) + return positions + + +def generate_circle(count: int, radius: float, altitude: float) -> List[dict]: + """Arrange drones evenly spaced around a circle.""" + positions = [] + for i in range(count): + angle = 2.0 * math.pi * i / count + positions.append({ + "id": i, + "x": round(radius * math.cos(angle), 2), + "y": round(radius * math.sin(angle), 2), + "z": round(-altitude, 2), + }) + return positions + + +def generate_line(count: int, spacing: float, altitude: float) -> List[dict]: + """Arrange drones in a line along the east (y) axis, centered at origin.""" + positions = [] + offset = (count - 1) * spacing / 2.0 + for i in range(count): + positions.append({ + "id": i, + "x": 0.0, + "y": round(i * spacing - offset, 2), + "z": round(-altitude, 2), + }) + return positions + + +def generate_v(count: int, spacing: float, altitude: float) -> List[dict]: + """Classic V formation with leader at front (northernmost position). + + Leader is id 0 at the tip. Remaining drones alternate left/right, + each row stepping back (south) and outward (east/west). + """ + positions = [{"id": 0, "x": 0.0, "y": 0.0, "z": round(-altitude, 2)}] + wing_index = 1 + for i in range(1, count): + side = 1 if (i % 2 == 1) else -1 + row = (i + 1) // 2 + positions.append({ + "id": wing_index, + "x": round(-row * spacing, 2), # step back (south) + "y": round(side * row * spacing, 2), # spread east/west + "z": round(-altitude, 2), + }) + wing_index += 1 + return positions + + +def generate_diamond(count: int, spacing: float, altitude: float) -> List[dict]: + """Diamond/rhombus formation. + + Builds concentric diamond rings outward from the center. Ring k has 4*k + positions (except the center which has 1). Drones fill from center outward. + """ + # Generate diamond coordinates ring by ring + coords: List[Tuple[float, float]] = [(0.0, 0.0)] + ring = 1 + while len(coords) < count: + # Each ring has 4 sides, each side has 'ring' segments + for side in range(4): + for step in range(ring): + if side == 0: # top-right edge + x = (ring - step) * spacing + y = step * spacing + elif side == 1: # bottom-right edge + x = -step * spacing + y = (ring - step) * spacing + elif side == 2: # bottom-left edge + x = -(ring - step) * spacing + y = -step * spacing + else: # top-left edge + x = step * spacing + y = -(ring - step) * spacing + coords.append((round(x, 2), round(y, 2))) + if len(coords) >= count: + break + if len(coords) >= count: + break + ring += 1 + + positions = [] + for i in range(count): + positions.append({ + "id": i, + "x": coords[i][0], + "y": coords[i][1], + "z": round(-altitude, 2), + }) + return positions + + +def generate_random(count: int, spacing: float, altitude: float) -> List[dict]: + """Random positions within a bounding box, enforcing minimum separation. + + The bounding box size scales with count and spacing. Uses rejection + sampling to guarantee minimum separation between all drones. + """ + box_size = max(spacing * math.sqrt(count) * 2, spacing * 4) + positions = [] + coords: List[Tuple[float, float]] = [] + max_attempts = count * 1000 + + rng = random.Random(42) # deterministic for reproducibility + attempts = 0 + while len(coords) < count and attempts < max_attempts: + x = round(rng.uniform(-box_size / 2, box_size / 2), 2) + y = round(rng.uniform(-box_size / 2, box_size / 2), 2) + too_close = False + for cx, cy in coords: + if math.sqrt((x - cx) ** 2 + (y - cy) ** 2) < MIN_SEPARATION: + too_close = True + break + if not too_close: + coords.append((x, y)) + attempts += 1 + + if len(coords) < count: + print( + f"Warning: could only place {len(coords)}/{count} drones " + f"with minimum separation {MIN_SEPARATION}m in bounding box {box_size:.1f}m", + file=sys.stderr, + ) + + for i, (x, y) in enumerate(coords): + positions.append({ + "id": i, + "x": x, + "y": y, + "z": round(-altitude, 2), + }) + return positions + + +PATTERN_GENERATORS = { + "grid": generate_grid, + "circle": generate_circle, + "line": generate_line, + "v": generate_v, + "diamond": generate_diamond, + "random": generate_random, +} + + +def validate_no_overlap(positions: List[dict]) -> bool: + """Check that no two drones occupy the same horizontal position (within MIN_SEPARATION).""" + for i in range(len(positions)): + for j in range(i + 1, len(positions)): + dx = positions[i]["x"] - positions[j]["x"] + dy = positions[i]["y"] - positions[j]["y"] + dist = math.sqrt(dx * dx + dy * dy) + if dist < MIN_SEPARATION: + print( + f"Error: drones {positions[i]['id']} and {positions[j]['id']} " + f"are only {dist:.2f}m apart (minimum: {MIN_SEPARATION}m)", + file=sys.stderr, + ) + return False + return True + + +def build_formation_dict( + positions: List[dict], + origin_lat: float, + origin_lon: float, + origin_alt: float, + target_north: float, + target_east: float, +) -> dict: + """Build the formation dictionary matching swarm_formation.yaml structure.""" + return { + "origin": { + "lat": origin_lat, + "lon": origin_lon, + "alt": origin_alt, + }, + "formation": positions, + "target": { + "x": target_north, + "y": target_east, + "z": 0.0, + }, + } + + +def format_yaml(data: dict) -> str: + """Format the formation data as YAML with flow-style formation entries. + + Produces output matching the established swarm_formation.yaml style with + inline dicts for formation entries and block style for origin/target. + """ + lines = [ + "# Formation config for SIH swarm simulation", + "# Generated by swarm_formation_designer.py", + "", + "origin:", + f" lat: {data['origin']['lat']}", + f" lon: {data['origin']['lon']}", + f" alt: {data['origin']['alt']}", + "", + "formation:", + ] + + # Determine padding width for aligned id field + max_id = max(p["id"] for p in data["formation"]) + id_width = len(str(max_id)) + + for pos in data["formation"]: + id_str = str(pos["id"]).rjust(id_width) + lines.append( + f" - {{id: {id_str}, " + f"x: {pos['x']}, " + f"y: {pos['y']}, " + f"z: {pos['z']}}}" + ) + + lines.extend([ + "", + "target:", + f" x: {data['target']['x']}", + f" y: {data['target']['y']}", + f" z: {data['target']['z']}", + "", + ]) + return "\n".join(lines) + + +def ascii_preview(positions: List[dict], target_north: float, target_east: float) -> str: + """Render an ASCII top-down map showing drone positions and target. + + North is up, East is right. Axes are labeled. The target is shown as 'T' + and drones as their id number (or '*' if id >= 10). + """ + width = 60 + height = 30 + + # Collect all points (drones + target) + all_x = [p["x"] for p in positions] + [target_north] + all_y = [p["y"] for p in positions] + [target_east] + + min_x, max_x = min(all_x), max(all_x) + min_y, max_y = min(all_y), max(all_y) + + # Add margin + range_x = max_x - min_x if max_x != min_x else 1.0 + range_y = max_y - min_y if max_y != min_y else 1.0 + margin = 0.1 + min_x -= range_x * margin + max_x += range_x * margin + min_y -= range_y * margin + max_y += range_y * margin + range_x = max_x - min_x + range_y = max_y - min_y + + def to_grid(north: float, east: float) -> Tuple[int, int]: + col = int((east - min_y) / range_y * (width - 1)) + row = int((max_x - north) / range_x * (height - 1)) # north is up + col = max(0, min(width - 1, col)) + row = max(0, min(height - 1, row)) + return row, col + + # Initialize grid + grid = [[" " for _ in range(width)] for _ in range(height)] + + # Place target + tr, tc = to_grid(target_north, target_east) + grid[tr][tc] = "T" + + # Place drones (overwrite target if on same cell) + for pos in positions: + r, c = to_grid(pos["x"], pos["y"]) + label = str(pos["id"]) if pos["id"] < 10 else "*" + grid[r][c] = label + + # Build output + out = [] + out.append(f" Formation Preview ({len(positions)} drones)") + out.append(f" North ^ (x range: {min_x:.1f} to {max_x:.1f} m)") + out.append(" " + "-" * (width + 2)) + for row in grid: + out.append(" |" + "".join(row) + "|") + out.append(" " + "-" * (width + 2)) + out.append(f" East -> (y range: {min_y:.1f} to {max_y:.1f} m)") + out.append("") + out.append(" Legend: 0-9 = drone id, * = drone id >= 10, T = target") + return "\n".join(out) + + +def parse_args(argv: "List[str] | None" = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Create and preview swarm formation YAML files for PX4 SIH simulation.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " %(prog)s --pattern grid --count 16 --spacing 2 --preview\n" + " %(prog)s --pattern circle --count 8 --radius 10 -o my_formation.yaml\n" + " %(prog)s --pattern v --count 7 --spacing 3 --preview -o v_formation.yaml\n" + ), + ) + parser.add_argument( + "--pattern", + choices=list(PATTERN_GENERATORS.keys()), + default="grid", + help="Formation pattern (default: grid)", + ) + parser.add_argument( + "--count", + type=int, + default=DEFAULT_COUNT, + help=f"Number of drones (default: {DEFAULT_COUNT})", + ) + parser.add_argument( + "--spacing", + type=float, + default=DEFAULT_SPACING, + help=f"Spacing between drones in meters (default: {DEFAULT_SPACING})", + ) + parser.add_argument( + "--altitude", + type=float, + default=DEFAULT_ALTITUDE, + help=f"Flight altitude in meters AGL (default: {DEFAULT_ALTITUDE})", + ) + parser.add_argument( + "--radius", + type=float, + default=DEFAULT_RADIUS, + help=f"Radius for circle pattern in meters (default: {DEFAULT_RADIUS})", + ) + parser.add_argument( + "--origin-lat", + type=float, + default=DEFAULT_LAT, + help=f"Origin latitude (default: {DEFAULT_LAT})", + ) + parser.add_argument( + "--origin-lon", + type=float, + default=DEFAULT_LON, + help=f"Origin longitude (default: {DEFAULT_LON})", + ) + parser.add_argument( + "--origin-alt", + type=float, + default=DEFAULT_ALT, + help=f"Origin altitude AMSL in meters (default: {DEFAULT_ALT})", + ) + parser.add_argument( + "--target-north", + type=float, + default=DEFAULT_TARGET_NORTH, + help=f"Target position north of origin in meters (default: {DEFAULT_TARGET_NORTH})", + ) + parser.add_argument( + "--target-east", + type=float, + default=DEFAULT_TARGET_EAST, + help=f"Target position east of origin in meters (default: {DEFAULT_TARGET_EAST})", + ) + parser.add_argument( + "--preview", + action="store_true", + help="Show ASCII preview of the formation", + ) + parser.add_argument( + "-o", "--output", + type=str, + default=None, + help="Output YAML file path (default: print to stdout)", + ) + + args = parser.parse_args(argv) + + if args.count < 1: + parser.error("--count must be at least 1") + if args.spacing <= 0: + parser.error("--spacing must be positive") + if args.altitude <= 0: + parser.error("--altitude must be positive") + if args.radius <= 0: + parser.error("--radius must be positive") + + return args + + +def main(argv: "List[str] | None" = None) -> int: + args = parse_args(argv) + + # Select generator and dispatch with appropriate arguments + generator = PATTERN_GENERATORS[args.pattern] + if args.pattern == "circle": + positions = generator(args.count, args.radius, args.altitude) + else: + positions = generator(args.count, args.spacing, args.altitude) + + # Validate + if not validate_no_overlap(positions): + return 1 + + # Build output + data = build_formation_dict( + positions, + args.origin_lat, + args.origin_lon, + args.origin_alt, + args.target_north, + args.target_east, + ) + yaml_text = format_yaml(data) + + # Preview + if args.preview: + print(ascii_preview(positions, args.target_north, args.target_east)) + print() + + # Output + if args.output: + with open(args.output, "w") as f: + f.write(yaml_text) + print(f"Wrote {len(positions)} drone formation to {args.output}") + else: + if not args.preview: + print(yaml_text, end="") + else: + print(yaml_text, end="") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())