feat(simulation): add swarm infrastructure scripts

- swarm_formation.yaml: default 16-drone 4x4 grid formation config
- swarm_formation_designer.py: CLI to generate/preview formation YAML
  files with grid, circle, line, v, diamond, and random patterns
- sih_swarm_run.sh: launcher for N SIH instances with XRCE-DDS agent,
  reads formation YAML for per-drone positions
- collect_swarm_ulogs.py: collects ULog files from all instances into
  a single output directory with drone ID prefixes
- generate_swarm_ulogs.py: synthetic ULog generator for viewer testing
  with staggered starts, formation phases, and edge cases

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
This commit is contained in:
Ramon Roche 2026-03-16 07:44:32 -07:00
parent 2ae4ca4bf4
commit 8bb863f417
No known key found for this signature in database
GPG Key ID: 275988FAE5821713
5 changed files with 1287 additions and 0 deletions

View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""Collect ULog files from SIH swarm simulation instances.
Gathers .ulg files from per-instance log directories and copies them
into a single output folder with filenames that identify the source instance.
"""
import argparse
import glob
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
def find_repo_root() -> Path:
"""Walk up from script location to find the repository root."""
path = Path(__file__).resolve().parent
while path != path.parent:
if (path / "ROMFS").is_dir() and (path / "src").is_dir():
return path
path = path.parent
return Path(__file__).resolve().parent.parent.parent
def parse_instance_number(filepath: str) -> int | None:
"""Extract the instance number from a path containing instance_N."""
parts = Path(filepath).parts
for part in parts:
if part.startswith("instance_"):
try:
return int(part.split("_", 1)[1])
except (ValueError, IndexError):
continue
return None
def collect_ulogs(build_dir: Path, output_dir: Path, max_instances: int | None) -> None:
"""Find and copy ULog files from swarm instance directories."""
pattern = str(build_dir / "instance_*" / "log" / "**" / "*.ulg")
ulog_files = sorted(glob.glob(pattern, recursive=True))
if not ulog_files:
print(f"No ULog files found matching: {pattern}")
print()
print("Troubleshooting:")
print(f" - Verify build directory exists: {build_dir}")
print(f" - Check for instance_* subdirectories in {build_dir}")
print(" - Ensure the swarm simulation has run and produced logs")
sys.exit(1)
# Filter by instance count if specified.
if max_instances is not None:
ulog_files = [
f for f in ulog_files
if (n := parse_instance_number(f)) is not None and n < max_instances
]
if not ulog_files:
assert max_instances is not None
print(f"No ULog files found for the requested instance range (0..{max_instances - 1}).")
sys.exit(1)
output_dir.mkdir(parents=True, exist_ok=True)
total_size = 0
instances_seen: set[int] = set()
copied = 0
for src in ulog_files:
instance = parse_instance_number(src)
if instance is None:
print(f" skipping (could not determine instance): {src}")
continue
instances_seen.add(instance)
original_name = Path(src).name
dest_name = f"drone_{instance:02d}_{original_name}"
dest_path = output_dir / dest_name
shutil.copy2(src, dest_path)
file_size = os.path.getsize(src)
total_size += file_size
copied += 1
print(f" [{instance:02d}] {original_name} ({file_size / 1024:.1f} KB)")
print()
print("Summary")
print("-" * 40)
print(f" Files collected : {copied}")
print(f" Instances : {sorted(instances_seen)}")
print(f" Total size : {total_size / (1024 * 1024):.2f} MB")
print(f" Output directory: {output_dir.resolve()}")
def main() -> None:
repo_root = find_repo_root()
default_build = str(repo_root / "build" / "px4_sitl_sih")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
parser = argparse.ArgumentParser(
description="Collect ULog files from SIH swarm simulation instances.",
)
parser.add_argument(
"--build-dir",
default=default_build,
help=f"Path to the build directory (default: {default_build})",
)
parser.add_argument(
"--output-dir",
default=None,
help="Output directory (default: swarm_ulogs_YYYYMMDD_HHMMSS)",
)
parser.add_argument(
"--instances",
type=int,
default=None,
help="Number of instances to collect from (default: auto-detect)",
)
args = parser.parse_args()
build_dir = Path(args.build_dir)
if not build_dir.is_dir():
print(f"Error: build directory does not exist: {build_dir}")
sys.exit(1)
if args.output_dir is not None:
output_dir = Path(args.output_dir)
else:
output_dir = Path(f"swarm_ulogs_{timestamp}")
print(f"Collecting ULog files from: {build_dir}")
print(f"Output directory: {output_dir}")
print()
collect_ulogs(build_dir, output_dir, args.instances)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""Generate synthetic ULog files for testing mavsim-viewer swarm replay.
Creates N drone ULog files with crafted trajectories in valid ULog binary
format, without requiring any real PX4 simulation.
"""
import argparse
import json
import math
import os
import struct
# ---------------------------------------------------------------------------
# ULog binary writer
# ---------------------------------------------------------------------------
ULOG_MAGIC = b'\x55\x4c\x6f\x67\x01\x12\x35'
ULOG_VERSION = 1
class ULogWriter:
"""Write a minimal but valid ULog binary file."""
def __init__(self, path: str, timestamp_us: int = 0):
self._path = path
self._fp = open(path, 'wb')
self._msg_id_counter = 0
self._subscriptions: dict[str, int] = {} # topic_name -> msg_id
self._write_header(timestamp_us)
self._write_flag_bits()
# -- low-level helpers --------------------------------------------------
def _write_raw(self, data: bytes):
self._fp.write(data)
def _write_message(self, msg_type: int, payload: bytes):
"""Write a ULog message: uint16 size + uint8 type + payload."""
self._write_raw(struct.pack('<HB', len(payload), msg_type))
self._write_raw(payload)
def _write_header(self, timestamp_us: int):
self._write_raw(ULOG_MAGIC)
self._write_raw(struct.pack('<BQ', ULOG_VERSION, timestamp_us))
def _write_flag_bits(self):
# 'B' message: 8 compat + 8 incompat + 3*uint64 appended offsets
payload = b'\x00' * 8 + b'\x00' * 8 + struct.pack('<QQQ', 0, 0, 0)
self._write_message(0x42, payload)
# -- definition messages ------------------------------------------------
def write_format(self, fmt_string: str):
"""Write a Format ('F') definition message.
fmt_string example: "vehicle_local_position:uint64_t timestamp;float x;float y"
"""
payload = fmt_string.encode('ascii')
self._write_message(0x46, payload)
def write_info(self, key: str, value: str):
"""Write an Info ('I') message with a string value."""
key_with_type = f"char[{len(value)}] {key}"
key_bytes = key_with_type.encode('ascii')
value_bytes = value.encode('ascii')
payload = struct.pack('<B', len(key_bytes)) + key_bytes + value_bytes
self._write_message(0x49, payload)
def add_subscription(self, topic_name: str, multi_id: int = 0) -> int:
"""Write an AddSubscription ('A') message. Returns the assigned msg_id."""
msg_id = self._msg_id_counter
self._msg_id_counter += 1
self._subscriptions[topic_name] = msg_id
name_bytes = topic_name.encode('ascii')
payload = struct.pack('<BH', multi_id, msg_id) + name_bytes
self._write_message(0x41, payload)
return msg_id
def write_data(self, topic_name: str, data_bytes: bytes):
"""Write a Data ('D') message for a subscribed topic.
data_bytes must be the fully serialized payload (starting with
uint64 timestamp) matching the format definition.
"""
msg_id = self._subscriptions[topic_name]
payload = struct.pack('<H', msg_id) + data_bytes
self._write_message(0x44, payload)
def close(self):
self._fp.close()
# ---------------------------------------------------------------------------
# Topic serialization helpers
# ---------------------------------------------------------------------------
def pack_vehicle_local_position(timestamp_us: int, x: float, y: float, z: float,
vx: float, vy: float, vz: float,
ref_lat: float, ref_lon: float, ref_alt: float) -> bytes:
return struct.pack('<Qffffffddf',
timestamp_us, x, y, z, vx, vy, vz,
ref_lat, ref_lon, ref_alt)
def pack_vehicle_attitude(timestamp_us: int, q: tuple[float, ...]) -> bytes:
return struct.pack('<Qffff', timestamp_us, q[0], q[1], q[2], q[3])
def pack_vehicle_global_position(timestamp_us: int, lat: float, lon: float, alt: float) -> bytes:
return struct.pack('<Qddf', timestamp_us, lat, lon, alt)
def pack_vehicle_status(timestamp_us: int, vehicle_type: int, nav_state: int) -> bytes:
return struct.pack('<QBB', timestamp_us, vehicle_type, nav_state)
# ---------------------------------------------------------------------------
# Format definition strings (must match pack functions above)
# ---------------------------------------------------------------------------
FMT_LOCAL_POS = (
"vehicle_local_position:"
"uint64_t timestamp;"
"float x;float y;float z;"
"float vx;float vy;float vz;"
"double ref_lat;double ref_lon;float ref_alt"
)
FMT_ATTITUDE = (
"vehicle_attitude:"
"uint64_t timestamp;"
"float[4] q"
)
FMT_GLOBAL_POS = (
"vehicle_global_position:"
"uint64_t timestamp;"
"double lat;double lon;float alt"
)
FMT_STATUS = (
"vehicle_status:"
"uint64_t timestamp;"
"uint8_t vehicle_type;uint8_t nav_state"
)
# ---------------------------------------------------------------------------
# Trajectory math
# ---------------------------------------------------------------------------
REF_LAT = 47.397742
REF_LON = 8.545594
REF_ALT = 488.0
NAV_TAKEOFF = 2
NAV_OFFBOARD = 14
NAV_LAND = 18
def quat_from_yaw(yaw: float) -> tuple[float, float, float, float]:
return (math.cos(yaw / 2.0), 0.0, 0.0, math.sin(yaw / 2.0))
def smooth_interp(t: float) -> float:
"""Cosine interpolation factor 0..1."""
return 0.5 - 0.5 * math.cos(math.pi * max(0.0, min(1.0, t)))
def grid_position(drone_id: int) -> tuple[float, float]:
"""4x4 grid, 3m spacing, centered on origin."""
row = drone_id // 4
col = drone_id % 4
x = (row - 1.5) * 3.0
y = (col - 1.5) * 3.0
return x, y
def circle_position(drone_id: int, num_drones: int, radius: float = 30.0) -> tuple[float, float]:
angle = 2.0 * math.pi * drone_id / num_drones
return radius * math.cos(angle), radius * math.sin(angle)
def local_to_global(x: float, y: float, z: float) -> tuple[float, float, float]:
lat = REF_LAT + (x / 111320.0)
lon = REF_LON + (y / (111320.0 * math.cos(math.radians(REF_LAT))))
alt = REF_ALT - z
return lat, lon, alt
# ---------------------------------------------------------------------------
# Phase computations
# ---------------------------------------------------------------------------
TARGET = (200.0, 0.0, 0.0) # NED target for kamikaze
def compute_state(drone_id: int, t: float, num_drones: int, _duration: float):
"""Return (x, y, z, vx, vy, vz, yaw, nav_state) for a drone at time t.
Returns None if the drone hasn't started yet or has impacted.
"""
start_time = drone_id * 0.5 # staggered start
if t < start_time:
return None
# Failure drone 12 -- crash at t=70, log ends
if drone_id == 12 and t > 70.0:
return None
# Default nav state
nav_state = NAV_OFFBOARD
yaw = 0.0
vx, vy, vz = 0.0, 0.0, 0.0
gx, gy = grid_position(drone_id)
cx, cy = circle_position(drone_id, num_drones)
if t < 10.0:
# Phase 1: staggered start, on ground rising slightly
frac = smooth_interp((t - start_time) / max(0.01, 10.0 - start_time))
x, y = gx * frac, gy * frac
z = 0.0
nav_state = NAV_TAKEOFF
elif t < 25.0:
# Phase 2: grid takeoff, ascend to -20 NED
frac = smooth_interp((t - 10.0) / 15.0)
x, y = gx, gy
z = -20.0 * frac
vz = -20.0 / 15.0 if frac < 1.0 else 0.0
nav_state = NAV_TAKEOFF
elif t < 40.0:
# Phase 3: grid -> ring morph
frac = smooth_interp((t - 25.0) / 15.0)
x = gx + (cx - gx) * frac
y = gy + (cy - gy) * frac
z = -20.0
vx = (cx - gx) / 15.0
vy = (cy - gy) / 15.0
nav_state = NAV_OFFBOARD
elif t < 55.0:
# Phase 4: ring cruise CW at 3 m/s
angular_speed = 3.0 / 30.0 # omega = v/r
base_angle = 2.0 * math.pi * drone_id / num_drones
angle = base_angle - angular_speed * (t - 40.0) # CW -> negative
x = 30.0 * math.cos(angle)
y = 30.0 * math.sin(angle)
z = -20.0
vx = 30.0 * angular_speed * math.sin(angle)
vy = -30.0 * angular_speed * math.cos(angle)
yaw = math.atan2(-y, -x) # face center
nav_state = NAV_OFFBOARD
else:
# Phase 5/6: Kamikaze dive toward target
# Starting position at t=55
base_angle = 2.0 * math.pi * drone_id / num_drones
angular_speed = 3.0 / 30.0
angle55 = base_angle - angular_speed * 15.0
start_x = 30.0 * math.cos(angle55)
start_y = 30.0 * math.sin(angle55)
start_z = -20.0
dx = TARGET[0] - start_x
dy = TARGET[1] - start_y
dz = TARGET[2] - start_z
dist = math.sqrt(dx * dx + dy * dy + dz * dz)
speed = 12.0 # m/s max
nx, ny, nz = dx / dist, dy / dist, dz / dist
dt = t - 55.0
x = start_x + nx * speed * dt
y = start_y + ny * speed * dt
z = start_z + nz * speed * dt
vx = nx * speed
vy = ny * speed
vz = nz * speed
yaw = math.atan2(ny, nx)
nav_state = NAV_LAND
# Impact: close to target or above ground
cur_dist = math.sqrt((TARGET[0] - x) ** 2 + (TARGET[1] - y) ** 2 + (TARGET[2] - z) ** 2)
if cur_dist < 2.0 or z > -1.0:
return None
# Drone 12 failure behavior (attitude wobble from t=60)
if drone_id == 12 and t >= 60.0:
wobble_t = t - 60.0
wobble_amp = min(wobble_t * 0.1, 0.8) # growing oscillation
roll = wobble_amp * math.sin(wobble_t * 5.0)
pitch = wobble_amp * math.cos(wobble_t * 4.0)
# drift sideways
x += wobble_t * 2.0
y += wobble_t * 1.5
z += wobble_t * 0.5 # descending
# Build quaternion with roll/pitch/yaw
cr, sr = math.cos(roll / 2), math.sin(roll / 2)
cp, sp = math.cos(pitch / 2), math.sin(pitch / 2)
cy, sy = math.cos(yaw / 2), math.sin(yaw / 2)
q = (
cr * cp * cy + sr * sp * sy,
sr * cp * cy - cr * sp * sy,
cr * sp * cy + sr * cp * sy,
cr * cp * sy - sr * sp * cy,
)
return (x, y, z, vx, vy, vz, q, nav_state)
return (x, y, z, vx, vy, vz, quat_from_yaw(yaw), nav_state)
# ---------------------------------------------------------------------------
# Generate a single drone ULog
# ---------------------------------------------------------------------------
def generate_drone_ulog(drone_id: int, num_drones: int, duration: float,
output_dir: str, scenario: str):
filename = os.path.join(output_dir, f"drone_{drone_id:02d}.ulg")
start_us = drone_id * 500_000 # stagger by 0.5s in timestamps
writer = ULogWriter(filename, timestamp_us=start_us)
# Write format definitions
writer.write_format(FMT_LOCAL_POS)
writer.write_format(FMT_ATTITUDE)
writer.write_format(FMT_GLOBAL_POS)
writer.write_format(FMT_STATUS)
# Write info messages
writer.write_info("sys_name", "PX4")
writer.write_info("ver_hw", "SITL")
writer.write_info("scenario", scenario)
writer.write_info("drone_id", str(drone_id))
# Add subscriptions
writer.add_subscription("vehicle_local_position")
writer.add_subscription("vehicle_attitude")
writer.add_subscription("vehicle_global_position")
writer.add_subscription("vehicle_status")
# Determine rates per drone (edge cases)
pos_dt = 0.02 # 50 Hz
att_dt = 0.01 # 100 Hz
if drone_id == 13:
pos_dt = 0.1 # 10 Hz low rate
if drone_id == 14:
att_dt = 0.005 # 200 Hz high rate
# Generate data
# We iterate at the finest resolution needed and emit when due
fine_dt = min(pos_dt, att_dt, 0.001)
# Use the finest rate needed for this drone
fine_dt = min(pos_dt, att_dt)
status_interval = 1.0 # 1 Hz
next_pos_t = 0.0
next_att_t = 0.0
next_status_t = 0.0
last_nav_state = -1
t = 0.0
samples_written = 0
while t <= duration:
state = compute_state(drone_id, t, num_drones, duration)
# Drone 15 data gap at t=50-52
in_gap = (drone_id == 15 and 50.0 <= t <= 52.0)
if state is not None and not in_gap:
x, y, z, vx, vy, vz, q, nav_state = state
ts = int(t * 1_000_000)
# Position data
if t >= next_pos_t:
writer.write_data("vehicle_local_position",
pack_vehicle_local_position(
ts, x, y, z, vx, vy, vz,
REF_LAT, REF_LON, REF_ALT))
lat, lon, alt = local_to_global(x, y, z)
writer.write_data("vehicle_global_position",
pack_vehicle_global_position(ts, lat, lon, alt))
next_pos_t = t + pos_dt
samples_written += 2
# Attitude data
if t >= next_att_t:
writer.write_data("vehicle_attitude",
pack_vehicle_attitude(ts, q))
next_att_t = t + att_dt
samples_written += 1
# Status data
if t >= next_status_t or nav_state != last_nav_state:
writer.write_data("vehicle_status",
pack_vehicle_status(ts, 2, nav_state))
next_status_t = t + status_interval
last_nav_state = nav_state
samples_written += 1
t += fine_dt
writer.close()
return samples_written
# ---------------------------------------------------------------------------
# Scenario metadata
# ---------------------------------------------------------------------------
def write_scenario_json(output_dir: str, scenario: str, num_drones: int, duration: float):
meta = {
"scenario": scenario,
"num_drones": num_drones,
"duration_s": duration,
"origin": {
"lat": REF_LAT,
"lon": REF_LON,
"alt": REF_ALT,
},
"phases": [
{"name": "staggered_start", "start_s": 0, "end_s": 10},
{"name": "grid_takeoff", "start_s": 10, "end_s": 25},
{"name": "grid_to_ring", "start_s": 25, "end_s": 40},
{"name": "ring_cruise", "start_s": 40, "end_s": 55},
{"name": "kamikaze_dive", "start_s": 55, "end_s": 75},
],
"edge_cases": {
"drone_12": "failure at t=60, attitude oscillation, crash at t=70",
"drone_13": "low-rate position data (10Hz)",
"drone_14": "high-rate attitude data (200Hz)",
"drone_15": "2-second data gap at t=50-52",
},
"files": [f"drone_{i:02d}.ulg" for i in range(num_drones)],
}
path = os.path.join(output_dir, "scenario.json")
with open(path, 'w') as f:
json.dump(meta, f, indent=2)
print(f" Wrote {path}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate synthetic ULog files for swarm replay testing.")
parser.add_argument("--output-dir", default="synthetic_swarm_ulogs",
help="Output directory (default: synthetic_swarm_ulogs)")
parser.add_argument("--num-drones", type=int, default=16,
help="Number of drones (default: 16)")
parser.add_argument("--duration", type=float, default=90.0,
help="Scenario duration in seconds (default: 90)")
parser.add_argument("--scenario", default="hawk_descent",
help="Scenario name (default: hawk_descent)")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
print(f"Generating {args.num_drones} drone ULog files for scenario '{args.scenario}'")
print(f" Duration: {args.duration}s")
print(f" Output: {os.path.abspath(args.output_dir)}/")
print()
for i in range(args.num_drones):
samples = generate_drone_ulog(i, args.num_drones, args.duration,
args.output_dir, args.scenario)
tag = ""
if i == 12:
tag = " [failure drone]"
elif i == 13:
tag = " [low-rate pos]"
elif i == 14:
tag = " [high-rate att]"
elif i == 15:
tag = " [data gap]"
print(f" drone_{i:02d}.ulg ({samples} messages){tag}")
write_scenario_json(args.output_dir, args.scenario, args.num_drones, args.duration)
print()
print(f"Done. {args.num_drones} ULog files written to {os.path.abspath(args.output_dir)}/")
if __name__ == "__main__":
main()

141
Tools/simulation/sih_swarm_run.sh Executable file
View File

@ -0,0 +1,141 @@
#!/bin/bash
# Launch N SIH quadrotor instances with XRCE-DDS agent.
# Usage: ./sih_swarm_run.sh [CONFIG_YAML] [SPEED_FACTOR]
#
# Requires: build/px4_sitl_sih (make px4_sitl_sih sihsim_quadx_vision)
set -euo pipefail
ulimit -n 4096
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
src_path="$SCRIPT_DIR/../../"
build_path="${src_path}/build/px4_sitl_sih"
CONFIG_YAML="${1:-${SCRIPT_DIR}/swarm_formation.yaml}"
SPEED_FACTOR="${2:-1}"
if [ ! -f "$CONFIG_YAML" ]; then
echo "ERROR: config not found: $CONFIG_YAML"
exit 1
fi
if [ ! -d "$build_path" ]; then
echo "ERROR: build directory not found: $build_path"
echo "Run: make px4_sitl_sih"
exit 1
fi
# --- Parse YAML config via Python ---
eval "$(python3 -c "
import yaml, sys
with open('${CONFIG_YAML}') as f:
cfg = yaml.safe_load(f)
origin = cfg['origin']
print(f\"ORIGIN_LAT={origin['lat']}\")
print(f\"ORIGIN_LON={origin['lon']}\")
print(f\"ORIGIN_ALT={origin['alt']}\")
target = cfg['target']
print(f\"TARGET_X={target['x']}\")
print(f\"TARGET_Y={target['y']}\")
print(f\"TARGET_Z={target['z']}\")
drones = cfg['formation']
print(f\"DRONE_COUNT={len(drones)}\")
for i, d in enumerate(drones):
print(f\"DRONE_{i}_ID={d['id']}\")
print(f\"DRONE_{i}_X={d['x']}\")
print(f\"DRONE_{i}_Y={d['y']}\")
print(f\"DRONE_{i}_Z={d['z']}\")
")"
echo "Swarm config: $DRONE_COUNT drones, origin ($ORIGIN_LAT, $ORIGIN_LON, ${ORIGIN_ALT}m)"
echo "Target: ($TARGET_X, $TARGET_Y, $TARGET_Z)"
echo "Speed factor: $SPEED_FACTOR"
echo ""
# --- Cleanup ---
CHILD_PIDS=()
AGENT_PID=""
cleanup() {
echo ""
echo "Shutting down..."
for pid in "${CHILD_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
if [ -n "$AGENT_PID" ]; then
kill "$AGENT_PID" 2>/dev/null || true
fi
pkill -x px4 2>/dev/null || true
wait 2>/dev/null
echo "All processes stopped."
}
trap cleanup SIGINT SIGTERM
# --- Kill existing instances ---
echo "Killing running instances..."
pkill -x px4 2>/dev/null || true
sleep 1
# --- Start XRCE-DDS agent (if available) ---
if command -v MicroXRCEAgent &>/dev/null; then
echo "Starting MicroXRCEAgent on UDP port 8888..."
MicroXRCEAgent udp4 -p 8888 > /dev/null 2>&1 &
AGENT_PID=$!
echo "XRCE-DDS agent PID: $AGENT_PID"
else
echo "WARNING: MicroXRCEAgent not found on host."
echo " Start it separately (e.g. from Docker container):"
echo " MicroXRCEAgent udp4 -p 8888"
fi
echo ""
# --- Port map ---
printf "%-10s %-12s %-14s %-14s\n" "Instance" "MAV_SYS_ID" "DDS Namespace" "Offboard Port"
printf "%-10s %-12s %-14s %-14s\n" "--------" "----------" "-------------" "-------------"
n=0
while [ $n -lt "$DRONE_COUNT" ]; do
sysid=$((n + 1))
printf "%-10s %-12s %-14s %-14s\n" "$n" "$sysid" "px4_${n}" "$((14540 + n))"
n=$((n + 1))
done
echo ""
# --- Launch instances ---
export PX4_SIM_MODEL=sihsim_quadx_vision
n=0
while [ $n -lt "$DRONE_COUNT" ]; do
eval "FX=\$DRONE_${n}_X"
eval "FY=\$DRONE_${n}_Y"
eval "FZ=\$DRONE_${n}_Z"
working_dir="$build_path/instance_$n"
[ ! -d "$working_dir" ] && mkdir -p "$working_dir"
export PX4_HOME_LAT="$ORIGIN_LAT"
export PX4_HOME_LON="$ORIGIN_LON"
export PX4_HOME_ALT="$ORIGIN_ALT"
export PX4_FORMATION_X="$FX"
export PX4_FORMATION_Y="$FY"
export PX4_FORMATION_Z="$FZ"
export PX4_TARGET_X="$TARGET_X"
export PX4_TARGET_Y="$TARGET_Y"
export PX4_TARGET_Z="$TARGET_Z"
export PX4_SIM_SPEED_FACTOR="$SPEED_FACTOR"
# Force consistent DDS namespace for all instances (including 0)
export PX4_UXRCE_DDS_NS="px4_$n"
pushd "$working_dir" &>/dev/null
echo "Starting instance $n (formation: $FX, $FY, $FZ)"
"$build_path/bin/px4" -i "$n" -d "$build_path/etc" >out.log 2>err.log &
CHILD_PIDS+=($!)
popd &>/dev/null
n=$((n + 1))
done
echo ""
echo "All $DRONE_COUNT instances launched. Press Ctrl-C to stop."
wait

View File

@ -0,0 +1,32 @@
# Formation config for SIH swarm simulation
# Used by sih_swarm_run.sh and swarm_ros2 coordinator
origin:
lat: 47.397742
lon: 8.545594
alt: 488.0
formation:
# NED offsets from origin (meters). 4x4 grid, 1m spacing, at 20m altitude
- {id: 0, x: 0.0, y: 0.0, z: -20.0}
- {id: 1, x: 0.0, y: 1.0, z: -20.0}
- {id: 2, x: 0.0, y: 2.0, z: -20.0}
- {id: 3, x: 0.0, y: 3.0, z: -20.0}
- {id: 4, x: 1.0, y: 0.0, z: -20.0}
- {id: 5, x: 1.0, y: 1.0, z: -20.0}
- {id: 6, x: 1.0, y: 2.0, z: -20.0}
- {id: 7, x: 1.0, y: 3.0, z: -20.0}
- {id: 8, x: 2.0, y: 0.0, z: -20.0}
- {id: 9, x: 2.0, y: 1.0, z: -20.0}
- {id: 10, x: 2.0, y: 2.0, z: -20.0}
- {id: 11, x: 2.0, y: 3.0, z: -20.0}
- {id: 12, x: 3.0, y: 0.0, z: -20.0}
- {id: 13, x: 3.0, y: 1.0, z: -20.0}
- {id: 14, x: 3.0, y: 2.0, z: -20.0}
- {id: 15, x: 3.0, y: 3.0, z: -20.0}
target:
# Car position: 200m north of origin, ground level
x: 200.0
y: 0.0
z: 0.0

View File

@ -0,0 +1,485 @@
#!/usr/bin/env python3
"""
Swarm Formation Designer - Create and preview formation YAML files for PX4 SIH swarm simulation.
Generates swarm_formation.yaml files with preset formation patterns (grid, circle, line,
v, diamond, random) for use with sih_swarm_run.sh and swarm_ros2 coordinator.
Usage examples:
# 16-drone grid with 2m spacing, preview in terminal
python3 swarm_formation_designer.py --pattern grid --count 16 --spacing 2.0 --preview
# 8-drone circle, radius 10m, write to file
python3 swarm_formation_designer.py --pattern circle --count 8 --radius 10 -o formation.yaml
# V formation with custom origin and target
python3 swarm_formation_designer.py --pattern v --count 7 --spacing 3.0 \\
--origin-lat 37.4 --origin-lon -122.0 --target-north 150 --target-east 50
"""
import argparse
import math
import random
import sys
from typing import List, Tuple
# Defaults matching PX4 SIH test site (Zurich area)
DEFAULT_LAT = 47.397742
DEFAULT_LON = 8.545594
DEFAULT_ALT = 488.0
DEFAULT_ALTITUDE = 20.0
DEFAULT_SPACING = 1.0
DEFAULT_RADIUS = 5.0
DEFAULT_COUNT = 16
DEFAULT_TARGET_NORTH = 200.0
DEFAULT_TARGET_EAST = 0.0
MIN_SEPARATION = 0.5 # meters, minimum allowed distance between any two drones
def generate_grid(count: int, spacing: float, altitude: float) -> List[dict]:
"""Arrange drones in a square grid pattern (ceil(sqrt(count)) columns)."""
cols = math.ceil(math.sqrt(count))
positions = []
for i in range(count):
row = i // cols
col = i % cols
positions.append({
"id": i,
"x": round(row * spacing, 2),
"y": round(col * spacing, 2),
"z": round(-altitude, 2),
})
return positions
def generate_circle(count: int, radius: float, altitude: float) -> List[dict]:
"""Arrange drones evenly spaced around a circle."""
positions = []
for i in range(count):
angle = 2.0 * math.pi * i / count
positions.append({
"id": i,
"x": round(radius * math.cos(angle), 2),
"y": round(radius * math.sin(angle), 2),
"z": round(-altitude, 2),
})
return positions
def generate_line(count: int, spacing: float, altitude: float) -> List[dict]:
"""Arrange drones in a line along the east (y) axis, centered at origin."""
positions = []
offset = (count - 1) * spacing / 2.0
for i in range(count):
positions.append({
"id": i,
"x": 0.0,
"y": round(i * spacing - offset, 2),
"z": round(-altitude, 2),
})
return positions
def generate_v(count: int, spacing: float, altitude: float) -> List[dict]:
"""Classic V formation with leader at front (northernmost position).
Leader is id 0 at the tip. Remaining drones alternate left/right,
each row stepping back (south) and outward (east/west).
"""
positions = [{"id": 0, "x": 0.0, "y": 0.0, "z": round(-altitude, 2)}]
wing_index = 1
for i in range(1, count):
side = 1 if (i % 2 == 1) else -1
row = (i + 1) // 2
positions.append({
"id": wing_index,
"x": round(-row * spacing, 2), # step back (south)
"y": round(side * row * spacing, 2), # spread east/west
"z": round(-altitude, 2),
})
wing_index += 1
return positions
def generate_diamond(count: int, spacing: float, altitude: float) -> List[dict]:
"""Diamond/rhombus formation.
Builds concentric diamond rings outward from the center. Ring k has 4*k
positions (except the center which has 1). Drones fill from center outward.
"""
# Generate diamond coordinates ring by ring
coords: List[Tuple[float, float]] = [(0.0, 0.0)]
ring = 1
while len(coords) < count:
# Each ring has 4 sides, each side has 'ring' segments
for side in range(4):
for step in range(ring):
if side == 0: # top-right edge
x = (ring - step) * spacing
y = step * spacing
elif side == 1: # bottom-right edge
x = -step * spacing
y = (ring - step) * spacing
elif side == 2: # bottom-left edge
x = -(ring - step) * spacing
y = -step * spacing
else: # top-left edge
x = step * spacing
y = -(ring - step) * spacing
coords.append((round(x, 2), round(y, 2)))
if len(coords) >= count:
break
if len(coords) >= count:
break
ring += 1
positions = []
for i in range(count):
positions.append({
"id": i,
"x": coords[i][0],
"y": coords[i][1],
"z": round(-altitude, 2),
})
return positions
def generate_random(count: int, spacing: float, altitude: float) -> List[dict]:
"""Random positions within a bounding box, enforcing minimum separation.
The bounding box size scales with count and spacing. Uses rejection
sampling to guarantee minimum separation between all drones.
"""
box_size = max(spacing * math.sqrt(count) * 2, spacing * 4)
positions = []
coords: List[Tuple[float, float]] = []
max_attempts = count * 1000
rng = random.Random(42) # deterministic for reproducibility
attempts = 0
while len(coords) < count and attempts < max_attempts:
x = round(rng.uniform(-box_size / 2, box_size / 2), 2)
y = round(rng.uniform(-box_size / 2, box_size / 2), 2)
too_close = False
for cx, cy in coords:
if math.sqrt((x - cx) ** 2 + (y - cy) ** 2) < MIN_SEPARATION:
too_close = True
break
if not too_close:
coords.append((x, y))
attempts += 1
if len(coords) < count:
print(
f"Warning: could only place {len(coords)}/{count} drones "
f"with minimum separation {MIN_SEPARATION}m in bounding box {box_size:.1f}m",
file=sys.stderr,
)
for i, (x, y) in enumerate(coords):
positions.append({
"id": i,
"x": x,
"y": y,
"z": round(-altitude, 2),
})
return positions
PATTERN_GENERATORS = {
"grid": generate_grid,
"circle": generate_circle,
"line": generate_line,
"v": generate_v,
"diamond": generate_diamond,
"random": generate_random,
}
def validate_no_overlap(positions: List[dict]) -> bool:
"""Check that no two drones occupy the same horizontal position (within MIN_SEPARATION)."""
for i in range(len(positions)):
for j in range(i + 1, len(positions)):
dx = positions[i]["x"] - positions[j]["x"]
dy = positions[i]["y"] - positions[j]["y"]
dist = math.sqrt(dx * dx + dy * dy)
if dist < MIN_SEPARATION:
print(
f"Error: drones {positions[i]['id']} and {positions[j]['id']} "
f"are only {dist:.2f}m apart (minimum: {MIN_SEPARATION}m)",
file=sys.stderr,
)
return False
return True
def build_formation_dict(
positions: List[dict],
origin_lat: float,
origin_lon: float,
origin_alt: float,
target_north: float,
target_east: float,
) -> dict:
"""Build the formation dictionary matching swarm_formation.yaml structure."""
return {
"origin": {
"lat": origin_lat,
"lon": origin_lon,
"alt": origin_alt,
},
"formation": positions,
"target": {
"x": target_north,
"y": target_east,
"z": 0.0,
},
}
def format_yaml(data: dict) -> str:
"""Format the formation data as YAML with flow-style formation entries.
Produces output matching the established swarm_formation.yaml style with
inline dicts for formation entries and block style for origin/target.
"""
lines = [
"# Formation config for SIH swarm simulation",
"# Generated by swarm_formation_designer.py",
"",
"origin:",
f" lat: {data['origin']['lat']}",
f" lon: {data['origin']['lon']}",
f" alt: {data['origin']['alt']}",
"",
"formation:",
]
# Determine padding width for aligned id field
max_id = max(p["id"] for p in data["formation"])
id_width = len(str(max_id))
for pos in data["formation"]:
id_str = str(pos["id"]).rjust(id_width)
lines.append(
f" - {{id: {id_str}, "
f"x: {pos['x']}, "
f"y: {pos['y']}, "
f"z: {pos['z']}}}"
)
lines.extend([
"",
"target:",
f" x: {data['target']['x']}",
f" y: {data['target']['y']}",
f" z: {data['target']['z']}",
"",
])
return "\n".join(lines)
def ascii_preview(positions: List[dict], target_north: float, target_east: float) -> str:
"""Render an ASCII top-down map showing drone positions and target.
North is up, East is right. Axes are labeled. The target is shown as 'T'
and drones as their id number (or '*' if id >= 10).
"""
width = 60
height = 30
# Collect all points (drones + target)
all_x = [p["x"] for p in positions] + [target_north]
all_y = [p["y"] for p in positions] + [target_east]
min_x, max_x = min(all_x), max(all_x)
min_y, max_y = min(all_y), max(all_y)
# Add margin
range_x = max_x - min_x if max_x != min_x else 1.0
range_y = max_y - min_y if max_y != min_y else 1.0
margin = 0.1
min_x -= range_x * margin
max_x += range_x * margin
min_y -= range_y * margin
max_y += range_y * margin
range_x = max_x - min_x
range_y = max_y - min_y
def to_grid(north: float, east: float) -> Tuple[int, int]:
col = int((east - min_y) / range_y * (width - 1))
row = int((max_x - north) / range_x * (height - 1)) # north is up
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
return row, col
# Initialize grid
grid = [[" " for _ in range(width)] for _ in range(height)]
# Place target
tr, tc = to_grid(target_north, target_east)
grid[tr][tc] = "T"
# Place drones (overwrite target if on same cell)
for pos in positions:
r, c = to_grid(pos["x"], pos["y"])
label = str(pos["id"]) if pos["id"] < 10 else "*"
grid[r][c] = label
# Build output
out = []
out.append(f" Formation Preview ({len(positions)} drones)")
out.append(f" North ^ (x range: {min_x:.1f} to {max_x:.1f} m)")
out.append(" " + "-" * (width + 2))
for row in grid:
out.append(" |" + "".join(row) + "|")
out.append(" " + "-" * (width + 2))
out.append(f" East -> (y range: {min_y:.1f} to {max_y:.1f} m)")
out.append("")
out.append(" Legend: 0-9 = drone id, * = drone id >= 10, T = target")
return "\n".join(out)
def parse_args(argv: "List[str] | None" = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Create and preview swarm formation YAML files for PX4 SIH simulation.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" %(prog)s --pattern grid --count 16 --spacing 2 --preview\n"
" %(prog)s --pattern circle --count 8 --radius 10 -o my_formation.yaml\n"
" %(prog)s --pattern v --count 7 --spacing 3 --preview -o v_formation.yaml\n"
),
)
parser.add_argument(
"--pattern",
choices=list(PATTERN_GENERATORS.keys()),
default="grid",
help="Formation pattern (default: grid)",
)
parser.add_argument(
"--count",
type=int,
default=DEFAULT_COUNT,
help=f"Number of drones (default: {DEFAULT_COUNT})",
)
parser.add_argument(
"--spacing",
type=float,
default=DEFAULT_SPACING,
help=f"Spacing between drones in meters (default: {DEFAULT_SPACING})",
)
parser.add_argument(
"--altitude",
type=float,
default=DEFAULT_ALTITUDE,
help=f"Flight altitude in meters AGL (default: {DEFAULT_ALTITUDE})",
)
parser.add_argument(
"--radius",
type=float,
default=DEFAULT_RADIUS,
help=f"Radius for circle pattern in meters (default: {DEFAULT_RADIUS})",
)
parser.add_argument(
"--origin-lat",
type=float,
default=DEFAULT_LAT,
help=f"Origin latitude (default: {DEFAULT_LAT})",
)
parser.add_argument(
"--origin-lon",
type=float,
default=DEFAULT_LON,
help=f"Origin longitude (default: {DEFAULT_LON})",
)
parser.add_argument(
"--origin-alt",
type=float,
default=DEFAULT_ALT,
help=f"Origin altitude AMSL in meters (default: {DEFAULT_ALT})",
)
parser.add_argument(
"--target-north",
type=float,
default=DEFAULT_TARGET_NORTH,
help=f"Target position north of origin in meters (default: {DEFAULT_TARGET_NORTH})",
)
parser.add_argument(
"--target-east",
type=float,
default=DEFAULT_TARGET_EAST,
help=f"Target position east of origin in meters (default: {DEFAULT_TARGET_EAST})",
)
parser.add_argument(
"--preview",
action="store_true",
help="Show ASCII preview of the formation",
)
parser.add_argument(
"-o", "--output",
type=str,
default=None,
help="Output YAML file path (default: print to stdout)",
)
args = parser.parse_args(argv)
if args.count < 1:
parser.error("--count must be at least 1")
if args.spacing <= 0:
parser.error("--spacing must be positive")
if args.altitude <= 0:
parser.error("--altitude must be positive")
if args.radius <= 0:
parser.error("--radius must be positive")
return args
def main(argv: "List[str] | None" = None) -> int:
args = parse_args(argv)
# Select generator and dispatch with appropriate arguments
generator = PATTERN_GENERATORS[args.pattern]
if args.pattern == "circle":
positions = generator(args.count, args.radius, args.altitude)
else:
positions = generator(args.count, args.spacing, args.altitude)
# Validate
if not validate_no_overlap(positions):
return 1
# Build output
data = build_formation_dict(
positions,
args.origin_lat,
args.origin_lon,
args.origin_alt,
args.target_north,
args.target_east,
)
yaml_text = format_yaml(data)
# Preview
if args.preview:
print(ascii_preview(positions, args.target_north, args.target_east))
print()
# Output
if args.output:
with open(args.output, "w") as f:
f.write(yaml_text)
print(f"Wrote {len(positions)} drone formation to {args.output}")
else:
if not args.preview:
print(yaml_text, end="")
else:
print(yaml_text, end="")
return 0
if __name__ == "__main__":
sys.exit(main())