Compare commits

...

4 Commits

Author SHA1 Message Date
Ramon Roche 63cec47786 feat(simulation): add ROS 2 swarm controller package
ROS 2 package for coordinating PX4 SIH multi-drone swarm missions via
XRCE-DDS. Includes:

- swarm_controller.py: orchestrates N drones through 6 phases
  (EV lock, arm/takeoff, grid hold, circle transition, circle hold,
  kamikaze dive) with DDS cross-talk filtering for multi-instance
  XRCE-DDS setups
- vision_provider.py: publishes noisy VehicleOdometry from ground
  truth for EKF2 external vision fusion (placeholder for non-GPS)
- swarm_launch.py: ROS 2 launch file for N vision providers + controller
- run_swarm.sh: Docker entrypoint that launches PX4 instances,
  XRCE-DDS agent, and ROS 2 nodes
- Dockerfile + build_docker.sh: containerized build with ROS 2 Jazzy,
  px4_msgs from PX4 source, and the swarm package

Validated with 4 drones: all phases complete, 4/4 impacts, ULogs
generated per instance (~16 MB each).

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
2026-03-16 07:44:43 -07:00
Ramon Roche 8bb863f417 feat(simulation): add swarm infrastructure scripts
- swarm_formation.yaml: default 16-drone 4x4 grid formation config
- swarm_formation_designer.py: CLI to generate/preview formation YAML
  files with grid, circle, line, v, diamond, and random patterns
- sih_swarm_run.sh: launcher for N SIH instances with XRCE-DDS agent,
  reads formation YAML for per-drone positions
- collect_swarm_ulogs.py: collects ULog files from all instances into
  a single output directory with drone ID prefixes
- generate_swarm_ulogs.py: synthetic ULog generator for viewer testing
  with staggered starts, formation phases, and edge cases

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
2026-03-16 07:44:32 -07:00
Ramon Roche 2ae4ca4bf4 fix(mavlink): remove port cap that forced instances 10+ to share port
The line `[ "$px4_instance" -gt 9 ] && udp_offboard_port_remote=14549`
forced all instances above 9 to share the same offboard port, breaking
multi-instance setups with 10+ drones. Without it, each instance gets
a unique port 14540+i with no overlap.

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
2026-03-16 07:44:19 -07:00
Ramon Roche 2fc70dea3e feat(simulation): add SIH SITL board and quadx_vision airframe
Add px4_sitl_sih board configuration (Gazebo disabled) and airframe
10046_sihsim_quadx_vision for GPS-based SIH multi-instance swarm
simulation. The airframe enables simulated GPS, baro, and mag sensors
with standard quadrotor X configuration.

Signed-off-by: Ramon Roche <mrpollo@gmail.com>
2026-03-16 07:44:12 -07:00
22 changed files with 2537 additions and 1 deletions
@@ -0,0 +1,38 @@
#!/bin/sh
#
# @name QuadrotorX SITL for SIH (Swarm)
#
# @type Quadrotor
#
# @maintainer Ramon Roche <mrpollo@gmail.com>
#
. ${R}etc/init.d/rc.mc_defaults
PX4_SIMULATOR=${PX4_SIMULATOR:=sihsim}
PX4_SIM_MODEL=${PX4_SIM_MODEL:=quadx}
# Simulated sensors: GPS + baro + mag
param set-default SENS_EN_GPSSIM 1
param set-default SENS_EN_BAROSIM 1
param set-default SENS_EN_MAGSIM 1
# Square quadrotor X PX4 numbering
param set-default CA_ROTOR_COUNT 4
param set-default CA_ROTOR0_PX 1
param set-default CA_ROTOR0_PY 1
param set-default CA_ROTOR1_PX -1
param set-default CA_ROTOR1_PY -1
param set-default CA_ROTOR2_PX 1
param set-default CA_ROTOR2_PY -1
param set-default CA_ROTOR2_KM -0.05
param set-default CA_ROTOR3_PX -1
param set-default CA_ROTOR3_PY 1
param set-default CA_ROTOR3_KM -0.05
param set-default PWM_MAIN_FUNC1 101
param set-default PWM_MAIN_FUNC2 102
param set-default PWM_MAIN_FUNC3 103
param set-default PWM_MAIN_FUNC4 104
param set SIH_VEHICLE_TYPE 0
@@ -107,6 +107,7 @@ px4_add_romfs_files(
10043_sihsim_standard_vtol
10044_sihsim_hex
10045_sihsim_rover_ackermann
10046_sihsim_quadx_vision
17001_flightgear_tf-g1
17002_flightgear_tf-g2
@@ -3,7 +3,6 @@
udp_offboard_port_local=$((14580+px4_instance))
udp_offboard_port_remote=$((14540+px4_instance))
[ "$px4_instance" -gt 9 ] && udp_offboard_port_remote=14549 # use the same ports for more than 10 instances to avoid port overlaps
udp_onboard_payload_port_local=$((14280+px4_instance))
udp_onboard_payload_port_remote=$((14030+px4_instance))
udp_onboard_gimbal_port_local=$((13030+px4_instance))
+142
View File
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""Collect ULog files from SIH swarm simulation instances.
Gathers .ulg files from per-instance log directories and copies them
into a single output folder with filenames that identify the source instance.
"""
import argparse
import glob
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
def find_repo_root() -> Path:
"""Walk up from script location to find the repository root."""
path = Path(__file__).resolve().parent
while path != path.parent:
if (path / "ROMFS").is_dir() and (path / "src").is_dir():
return path
path = path.parent
return Path(__file__).resolve().parent.parent.parent
def parse_instance_number(filepath: str) -> int | None:
"""Extract the instance number from a path containing instance_N."""
parts = Path(filepath).parts
for part in parts:
if part.startswith("instance_"):
try:
return int(part.split("_", 1)[1])
except (ValueError, IndexError):
continue
return None
def collect_ulogs(build_dir: Path, output_dir: Path, max_instances: int | None) -> None:
"""Find and copy ULog files from swarm instance directories."""
pattern = str(build_dir / "instance_*" / "log" / "**" / "*.ulg")
ulog_files = sorted(glob.glob(pattern, recursive=True))
if not ulog_files:
print(f"No ULog files found matching: {pattern}")
print()
print("Troubleshooting:")
print(f" - Verify build directory exists: {build_dir}")
print(f" - Check for instance_* subdirectories in {build_dir}")
print(" - Ensure the swarm simulation has run and produced logs")
sys.exit(1)
# Filter by instance count if specified.
if max_instances is not None:
ulog_files = [
f for f in ulog_files
if (n := parse_instance_number(f)) is not None and n < max_instances
]
if not ulog_files:
assert max_instances is not None
print(f"No ULog files found for the requested instance range (0..{max_instances - 1}).")
sys.exit(1)
output_dir.mkdir(parents=True, exist_ok=True)
total_size = 0
instances_seen: set[int] = set()
copied = 0
for src in ulog_files:
instance = parse_instance_number(src)
if instance is None:
print(f" skipping (could not determine instance): {src}")
continue
instances_seen.add(instance)
original_name = Path(src).name
dest_name = f"drone_{instance:02d}_{original_name}"
dest_path = output_dir / dest_name
shutil.copy2(src, dest_path)
file_size = os.path.getsize(src)
total_size += file_size
copied += 1
print(f" [{instance:02d}] {original_name} ({file_size / 1024:.1f} KB)")
print()
print("Summary")
print("-" * 40)
print(f" Files collected : {copied}")
print(f" Instances : {sorted(instances_seen)}")
print(f" Total size : {total_size / (1024 * 1024):.2f} MB")
print(f" Output directory: {output_dir.resolve()}")
def main() -> None:
repo_root = find_repo_root()
default_build = str(repo_root / "build" / "px4_sitl_sih")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
parser = argparse.ArgumentParser(
description="Collect ULog files from SIH swarm simulation instances.",
)
parser.add_argument(
"--build-dir",
default=default_build,
help=f"Path to the build directory (default: {default_build})",
)
parser.add_argument(
"--output-dir",
default=None,
help="Output directory (default: swarm_ulogs_YYYYMMDD_HHMMSS)",
)
parser.add_argument(
"--instances",
type=int,
default=None,
help="Number of instances to collect from (default: auto-detect)",
)
args = parser.parse_args()
build_dir = Path(args.build_dir)
if not build_dir.is_dir():
print(f"Error: build directory does not exist: {build_dir}")
sys.exit(1)
if args.output_dir is not None:
output_dir = Path(args.output_dir)
else:
output_dir = Path(f"swarm_ulogs_{timestamp}")
print(f"Collecting ULog files from: {build_dir}")
print(f"Output directory: {output_dir}")
print()
collect_ulogs(build_dir, output_dir, args.instances)
if __name__ == "__main__":
main()
+487
View File
@@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""Generate synthetic ULog files for testing mavsim-viewer swarm replay.
Creates N drone ULog files with crafted trajectories in valid ULog binary
format, without requiring any real PX4 simulation.
"""
import argparse
import json
import math
import os
import struct
# ---------------------------------------------------------------------------
# ULog binary writer
# ---------------------------------------------------------------------------
ULOG_MAGIC = b'\x55\x4c\x6f\x67\x01\x12\x35'
ULOG_VERSION = 1
class ULogWriter:
"""Write a minimal but valid ULog binary file."""
def __init__(self, path: str, timestamp_us: int = 0):
self._path = path
self._fp = open(path, 'wb')
self._msg_id_counter = 0
self._subscriptions: dict[str, int] = {} # topic_name -> msg_id
self._write_header(timestamp_us)
self._write_flag_bits()
# -- low-level helpers --------------------------------------------------
def _write_raw(self, data: bytes):
self._fp.write(data)
def _write_message(self, msg_type: int, payload: bytes):
"""Write a ULog message: uint16 size + uint8 type + payload."""
self._write_raw(struct.pack('<HB', len(payload), msg_type))
self._write_raw(payload)
def _write_header(self, timestamp_us: int):
self._write_raw(ULOG_MAGIC)
self._write_raw(struct.pack('<BQ', ULOG_VERSION, timestamp_us))
def _write_flag_bits(self):
# 'B' message: 8 compat + 8 incompat + 3*uint64 appended offsets
payload = b'\x00' * 8 + b'\x00' * 8 + struct.pack('<QQQ', 0, 0, 0)
self._write_message(0x42, payload)
# -- definition messages ------------------------------------------------
def write_format(self, fmt_string: str):
"""Write a Format ('F') definition message.
fmt_string example: "vehicle_local_position:uint64_t timestamp;float x;float y"
"""
payload = fmt_string.encode('ascii')
self._write_message(0x46, payload)
def write_info(self, key: str, value: str):
"""Write an Info ('I') message with a string value."""
key_with_type = f"char[{len(value)}] {key}"
key_bytes = key_with_type.encode('ascii')
value_bytes = value.encode('ascii')
payload = struct.pack('<B', len(key_bytes)) + key_bytes + value_bytes
self._write_message(0x49, payload)
def add_subscription(self, topic_name: str, multi_id: int = 0) -> int:
"""Write an AddSubscription ('A') message. Returns the assigned msg_id."""
msg_id = self._msg_id_counter
self._msg_id_counter += 1
self._subscriptions[topic_name] = msg_id
name_bytes = topic_name.encode('ascii')
payload = struct.pack('<BH', multi_id, msg_id) + name_bytes
self._write_message(0x41, payload)
return msg_id
def write_data(self, topic_name: str, data_bytes: bytes):
"""Write a Data ('D') message for a subscribed topic.
data_bytes must be the fully serialized payload (starting with
uint64 timestamp) matching the format definition.
"""
msg_id = self._subscriptions[topic_name]
payload = struct.pack('<H', msg_id) + data_bytes
self._write_message(0x44, payload)
def close(self):
self._fp.close()
# ---------------------------------------------------------------------------
# Topic serialization helpers
# ---------------------------------------------------------------------------
def pack_vehicle_local_position(timestamp_us: int, x: float, y: float, z: float,
vx: float, vy: float, vz: float,
ref_lat: float, ref_lon: float, ref_alt: float) -> bytes:
return struct.pack('<Qffffffddf',
timestamp_us, x, y, z, vx, vy, vz,
ref_lat, ref_lon, ref_alt)
def pack_vehicle_attitude(timestamp_us: int, q: tuple[float, ...]) -> bytes:
return struct.pack('<Qffff', timestamp_us, q[0], q[1], q[2], q[3])
def pack_vehicle_global_position(timestamp_us: int, lat: float, lon: float, alt: float) -> bytes:
return struct.pack('<Qddf', timestamp_us, lat, lon, alt)
def pack_vehicle_status(timestamp_us: int, vehicle_type: int, nav_state: int) -> bytes:
return struct.pack('<QBB', timestamp_us, vehicle_type, nav_state)
# ---------------------------------------------------------------------------
# Format definition strings (must match pack functions above)
# ---------------------------------------------------------------------------
FMT_LOCAL_POS = (
"vehicle_local_position:"
"uint64_t timestamp;"
"float x;float y;float z;"
"float vx;float vy;float vz;"
"double ref_lat;double ref_lon;float ref_alt"
)
FMT_ATTITUDE = (
"vehicle_attitude:"
"uint64_t timestamp;"
"float[4] q"
)
FMT_GLOBAL_POS = (
"vehicle_global_position:"
"uint64_t timestamp;"
"double lat;double lon;float alt"
)
FMT_STATUS = (
"vehicle_status:"
"uint64_t timestamp;"
"uint8_t vehicle_type;uint8_t nav_state"
)
# ---------------------------------------------------------------------------
# Trajectory math
# ---------------------------------------------------------------------------
REF_LAT = 47.397742
REF_LON = 8.545594
REF_ALT = 488.0
NAV_TAKEOFF = 2
NAV_OFFBOARD = 14
NAV_LAND = 18
def quat_from_yaw(yaw: float) -> tuple[float, float, float, float]:
return (math.cos(yaw / 2.0), 0.0, 0.0, math.sin(yaw / 2.0))
def smooth_interp(t: float) -> float:
"""Cosine interpolation factor 0..1."""
return 0.5 - 0.5 * math.cos(math.pi * max(0.0, min(1.0, t)))
def grid_position(drone_id: int) -> tuple[float, float]:
"""4x4 grid, 3m spacing, centered on origin."""
row = drone_id // 4
col = drone_id % 4
x = (row - 1.5) * 3.0
y = (col - 1.5) * 3.0
return x, y
def circle_position(drone_id: int, num_drones: int, radius: float = 30.0) -> tuple[float, float]:
angle = 2.0 * math.pi * drone_id / num_drones
return radius * math.cos(angle), radius * math.sin(angle)
def local_to_global(x: float, y: float, z: float) -> tuple[float, float, float]:
lat = REF_LAT + (x / 111320.0)
lon = REF_LON + (y / (111320.0 * math.cos(math.radians(REF_LAT))))
alt = REF_ALT - z
return lat, lon, alt
# ---------------------------------------------------------------------------
# Phase computations
# ---------------------------------------------------------------------------
TARGET = (200.0, 0.0, 0.0) # NED target for kamikaze
def compute_state(drone_id: int, t: float, num_drones: int, _duration: float):
"""Return (x, y, z, vx, vy, vz, yaw, nav_state) for a drone at time t.
Returns None if the drone hasn't started yet or has impacted.
"""
start_time = drone_id * 0.5 # staggered start
if t < start_time:
return None
# Failure drone 12 -- crash at t=70, log ends
if drone_id == 12 and t > 70.0:
return None
# Default nav state
nav_state = NAV_OFFBOARD
yaw = 0.0
vx, vy, vz = 0.0, 0.0, 0.0
gx, gy = grid_position(drone_id)
cx, cy = circle_position(drone_id, num_drones)
if t < 10.0:
# Phase 1: staggered start, on ground rising slightly
frac = smooth_interp((t - start_time) / max(0.01, 10.0 - start_time))
x, y = gx * frac, gy * frac
z = 0.0
nav_state = NAV_TAKEOFF
elif t < 25.0:
# Phase 2: grid takeoff, ascend to -20 NED
frac = smooth_interp((t - 10.0) / 15.0)
x, y = gx, gy
z = -20.0 * frac
vz = -20.0 / 15.0 if frac < 1.0 else 0.0
nav_state = NAV_TAKEOFF
elif t < 40.0:
# Phase 3: grid -> ring morph
frac = smooth_interp((t - 25.0) / 15.0)
x = gx + (cx - gx) * frac
y = gy + (cy - gy) * frac
z = -20.0
vx = (cx - gx) / 15.0
vy = (cy - gy) / 15.0
nav_state = NAV_OFFBOARD
elif t < 55.0:
# Phase 4: ring cruise CW at 3 m/s
angular_speed = 3.0 / 30.0 # omega = v/r
base_angle = 2.0 * math.pi * drone_id / num_drones
angle = base_angle - angular_speed * (t - 40.0) # CW -> negative
x = 30.0 * math.cos(angle)
y = 30.0 * math.sin(angle)
z = -20.0
vx = 30.0 * angular_speed * math.sin(angle)
vy = -30.0 * angular_speed * math.cos(angle)
yaw = math.atan2(-y, -x) # face center
nav_state = NAV_OFFBOARD
else:
# Phase 5/6: Kamikaze dive toward target
# Starting position at t=55
base_angle = 2.0 * math.pi * drone_id / num_drones
angular_speed = 3.0 / 30.0
angle55 = base_angle - angular_speed * 15.0
start_x = 30.0 * math.cos(angle55)
start_y = 30.0 * math.sin(angle55)
start_z = -20.0
dx = TARGET[0] - start_x
dy = TARGET[1] - start_y
dz = TARGET[2] - start_z
dist = math.sqrt(dx * dx + dy * dy + dz * dz)
speed = 12.0 # m/s max
nx, ny, nz = dx / dist, dy / dist, dz / dist
dt = t - 55.0
x = start_x + nx * speed * dt
y = start_y + ny * speed * dt
z = start_z + nz * speed * dt
vx = nx * speed
vy = ny * speed
vz = nz * speed
yaw = math.atan2(ny, nx)
nav_state = NAV_LAND
# Impact: close to target or above ground
cur_dist = math.sqrt((TARGET[0] - x) ** 2 + (TARGET[1] - y) ** 2 + (TARGET[2] - z) ** 2)
if cur_dist < 2.0 or z > -1.0:
return None
# Drone 12 failure behavior (attitude wobble from t=60)
if drone_id == 12 and t >= 60.0:
wobble_t = t - 60.0
wobble_amp = min(wobble_t * 0.1, 0.8) # growing oscillation
roll = wobble_amp * math.sin(wobble_t * 5.0)
pitch = wobble_amp * math.cos(wobble_t * 4.0)
# drift sideways
x += wobble_t * 2.0
y += wobble_t * 1.5
z += wobble_t * 0.5 # descending
# Build quaternion with roll/pitch/yaw
cr, sr = math.cos(roll / 2), math.sin(roll / 2)
cp, sp = math.cos(pitch / 2), math.sin(pitch / 2)
cy, sy = math.cos(yaw / 2), math.sin(yaw / 2)
q = (
cr * cp * cy + sr * sp * sy,
sr * cp * cy - cr * sp * sy,
cr * sp * cy + sr * cp * sy,
cr * cp * sy - sr * sp * cy,
)
return (x, y, z, vx, vy, vz, q, nav_state)
return (x, y, z, vx, vy, vz, quat_from_yaw(yaw), nav_state)
# ---------------------------------------------------------------------------
# Generate a single drone ULog
# ---------------------------------------------------------------------------
def generate_drone_ulog(drone_id: int, num_drones: int, duration: float,
output_dir: str, scenario: str):
filename = os.path.join(output_dir, f"drone_{drone_id:02d}.ulg")
start_us = drone_id * 500_000 # stagger by 0.5s in timestamps
writer = ULogWriter(filename, timestamp_us=start_us)
# Write format definitions
writer.write_format(FMT_LOCAL_POS)
writer.write_format(FMT_ATTITUDE)
writer.write_format(FMT_GLOBAL_POS)
writer.write_format(FMT_STATUS)
# Write info messages
writer.write_info("sys_name", "PX4")
writer.write_info("ver_hw", "SITL")
writer.write_info("scenario", scenario)
writer.write_info("drone_id", str(drone_id))
# Add subscriptions
writer.add_subscription("vehicle_local_position")
writer.add_subscription("vehicle_attitude")
writer.add_subscription("vehicle_global_position")
writer.add_subscription("vehicle_status")
# Determine rates per drone (edge cases)
pos_dt = 0.02 # 50 Hz
att_dt = 0.01 # 100 Hz
if drone_id == 13:
pos_dt = 0.1 # 10 Hz low rate
if drone_id == 14:
att_dt = 0.005 # 200 Hz high rate
# Generate data
# We iterate at the finest resolution needed and emit when due
fine_dt = min(pos_dt, att_dt, 0.001)
# Use the finest rate needed for this drone
fine_dt = min(pos_dt, att_dt)
status_interval = 1.0 # 1 Hz
next_pos_t = 0.0
next_att_t = 0.0
next_status_t = 0.0
last_nav_state = -1
t = 0.0
samples_written = 0
while t <= duration:
state = compute_state(drone_id, t, num_drones, duration)
# Drone 15 data gap at t=50-52
in_gap = (drone_id == 15 and 50.0 <= t <= 52.0)
if state is not None and not in_gap:
x, y, z, vx, vy, vz, q, nav_state = state
ts = int(t * 1_000_000)
# Position data
if t >= next_pos_t:
writer.write_data("vehicle_local_position",
pack_vehicle_local_position(
ts, x, y, z, vx, vy, vz,
REF_LAT, REF_LON, REF_ALT))
lat, lon, alt = local_to_global(x, y, z)
writer.write_data("vehicle_global_position",
pack_vehicle_global_position(ts, lat, lon, alt))
next_pos_t = t + pos_dt
samples_written += 2
# Attitude data
if t >= next_att_t:
writer.write_data("vehicle_attitude",
pack_vehicle_attitude(ts, q))
next_att_t = t + att_dt
samples_written += 1
# Status data
if t >= next_status_t or nav_state != last_nav_state:
writer.write_data("vehicle_status",
pack_vehicle_status(ts, 2, nav_state))
next_status_t = t + status_interval
last_nav_state = nav_state
samples_written += 1
t += fine_dt
writer.close()
return samples_written
# ---------------------------------------------------------------------------
# Scenario metadata
# ---------------------------------------------------------------------------
def write_scenario_json(output_dir: str, scenario: str, num_drones: int, duration: float):
meta = {
"scenario": scenario,
"num_drones": num_drones,
"duration_s": duration,
"origin": {
"lat": REF_LAT,
"lon": REF_LON,
"alt": REF_ALT,
},
"phases": [
{"name": "staggered_start", "start_s": 0, "end_s": 10},
{"name": "grid_takeoff", "start_s": 10, "end_s": 25},
{"name": "grid_to_ring", "start_s": 25, "end_s": 40},
{"name": "ring_cruise", "start_s": 40, "end_s": 55},
{"name": "kamikaze_dive", "start_s": 55, "end_s": 75},
],
"edge_cases": {
"drone_12": "failure at t=60, attitude oscillation, crash at t=70",
"drone_13": "low-rate position data (10Hz)",
"drone_14": "high-rate attitude data (200Hz)",
"drone_15": "2-second data gap at t=50-52",
},
"files": [f"drone_{i:02d}.ulg" for i in range(num_drones)],
}
path = os.path.join(output_dir, "scenario.json")
with open(path, 'w') as f:
json.dump(meta, f, indent=2)
print(f" Wrote {path}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Generate synthetic ULog files for swarm replay testing.")
parser.add_argument("--output-dir", default="synthetic_swarm_ulogs",
help="Output directory (default: synthetic_swarm_ulogs)")
parser.add_argument("--num-drones", type=int, default=16,
help="Number of drones (default: 16)")
parser.add_argument("--duration", type=float, default=90.0,
help="Scenario duration in seconds (default: 90)")
parser.add_argument("--scenario", default="hawk_descent",
help="Scenario name (default: hawk_descent)")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
print(f"Generating {args.num_drones} drone ULog files for scenario '{args.scenario}'")
print(f" Duration: {args.duration}s")
print(f" Output: {os.path.abspath(args.output_dir)}/")
print()
for i in range(args.num_drones):
samples = generate_drone_ulog(i, args.num_drones, args.duration,
args.output_dir, args.scenario)
tag = ""
if i == 12:
tag = " [failure drone]"
elif i == 13:
tag = " [low-rate pos]"
elif i == 14:
tag = " [high-rate att]"
elif i == 15:
tag = " [data gap]"
print(f" drone_{i:02d}.ulg ({samples} messages){tag}")
write_scenario_json(args.output_dir, args.scenario, args.num_drones, args.duration)
print()
print(f"Done. {args.num_drones} ULog files written to {os.path.abspath(args.output_dir)}/")
if __name__ == "__main__":
main()
+141
View File
@@ -0,0 +1,141 @@
#!/bin/bash
# Launch N SIH quadrotor instances with XRCE-DDS agent.
# Usage: ./sih_swarm_run.sh [CONFIG_YAML] [SPEED_FACTOR]
#
# Requires: build/px4_sitl_sih (make px4_sitl_sih sihsim_quadx_vision)
set -euo pipefail
ulimit -n 4096
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
src_path="$SCRIPT_DIR/../../"
build_path="${src_path}/build/px4_sitl_sih"
CONFIG_YAML="${1:-${SCRIPT_DIR}/swarm_formation.yaml}"
SPEED_FACTOR="${2:-1}"
if [ ! -f "$CONFIG_YAML" ]; then
echo "ERROR: config not found: $CONFIG_YAML"
exit 1
fi
if [ ! -d "$build_path" ]; then
echo "ERROR: build directory not found: $build_path"
echo "Run: make px4_sitl_sih"
exit 1
fi
# --- Parse YAML config via Python ---
eval "$(python3 -c "
import yaml, sys
with open('${CONFIG_YAML}') as f:
cfg = yaml.safe_load(f)
origin = cfg['origin']
print(f\"ORIGIN_LAT={origin['lat']}\")
print(f\"ORIGIN_LON={origin['lon']}\")
print(f\"ORIGIN_ALT={origin['alt']}\")
target = cfg['target']
print(f\"TARGET_X={target['x']}\")
print(f\"TARGET_Y={target['y']}\")
print(f\"TARGET_Z={target['z']}\")
drones = cfg['formation']
print(f\"DRONE_COUNT={len(drones)}\")
for i, d in enumerate(drones):
print(f\"DRONE_{i}_ID={d['id']}\")
print(f\"DRONE_{i}_X={d['x']}\")
print(f\"DRONE_{i}_Y={d['y']}\")
print(f\"DRONE_{i}_Z={d['z']}\")
")"
echo "Swarm config: $DRONE_COUNT drones, origin ($ORIGIN_LAT, $ORIGIN_LON, ${ORIGIN_ALT}m)"
echo "Target: ($TARGET_X, $TARGET_Y, $TARGET_Z)"
echo "Speed factor: $SPEED_FACTOR"
echo ""
# --- Cleanup ---
CHILD_PIDS=()
AGENT_PID=""
cleanup() {
echo ""
echo "Shutting down..."
for pid in "${CHILD_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
if [ -n "$AGENT_PID" ]; then
kill "$AGENT_PID" 2>/dev/null || true
fi
pkill -x px4 2>/dev/null || true
wait 2>/dev/null
echo "All processes stopped."
}
trap cleanup SIGINT SIGTERM
# --- Kill existing instances ---
echo "Killing running instances..."
pkill -x px4 2>/dev/null || true
sleep 1
# --- Start XRCE-DDS agent (if available) ---
if command -v MicroXRCEAgent &>/dev/null; then
echo "Starting MicroXRCEAgent on UDP port 8888..."
MicroXRCEAgent udp4 -p 8888 > /dev/null 2>&1 &
AGENT_PID=$!
echo "XRCE-DDS agent PID: $AGENT_PID"
else
echo "WARNING: MicroXRCEAgent not found on host."
echo " Start it separately (e.g. from Docker container):"
echo " MicroXRCEAgent udp4 -p 8888"
fi
echo ""
# --- Port map ---
printf "%-10s %-12s %-14s %-14s\n" "Instance" "MAV_SYS_ID" "DDS Namespace" "Offboard Port"
printf "%-10s %-12s %-14s %-14s\n" "--------" "----------" "-------------" "-------------"
n=0
while [ $n -lt "$DRONE_COUNT" ]; do
sysid=$((n + 1))
printf "%-10s %-12s %-14s %-14s\n" "$n" "$sysid" "px4_${n}" "$((14540 + n))"
n=$((n + 1))
done
echo ""
# --- Launch instances ---
export PX4_SIM_MODEL=sihsim_quadx_vision
n=0
while [ $n -lt "$DRONE_COUNT" ]; do
eval "FX=\$DRONE_${n}_X"
eval "FY=\$DRONE_${n}_Y"
eval "FZ=\$DRONE_${n}_Z"
working_dir="$build_path/instance_$n"
[ ! -d "$working_dir" ] && mkdir -p "$working_dir"
export PX4_HOME_LAT="$ORIGIN_LAT"
export PX4_HOME_LON="$ORIGIN_LON"
export PX4_HOME_ALT="$ORIGIN_ALT"
export PX4_FORMATION_X="$FX"
export PX4_FORMATION_Y="$FY"
export PX4_FORMATION_Z="$FZ"
export PX4_TARGET_X="$TARGET_X"
export PX4_TARGET_Y="$TARGET_Y"
export PX4_TARGET_Z="$TARGET_Z"
export PX4_SIM_SPEED_FACTOR="$SPEED_FACTOR"
# Force consistent DDS namespace for all instances (including 0)
export PX4_UXRCE_DDS_NS="px4_$n"
pushd "$working_dir" &>/dev/null
echo "Starting instance $n (formation: $FX, $FY, $FZ)"
"$build_path/bin/px4" -i "$n" -d "$build_path/etc" >out.log 2>err.log &
CHILD_PIDS+=($!)
popd &>/dev/null
n=$((n + 1))
done
echo ""
echo "All $DRONE_COUNT instances launched. Press Ctrl-C to stop."
wait
+32
View File
@@ -0,0 +1,32 @@
# Formation config for SIH swarm simulation
# Used by sih_swarm_run.sh and swarm_ros2 coordinator
origin:
lat: 47.397742
lon: 8.545594
alt: 488.0
formation:
# NED offsets from origin (meters). 4x4 grid, 1m spacing, at 20m altitude
- {id: 0, x: 0.0, y: 0.0, z: -20.0}
- {id: 1, x: 0.0, y: 1.0, z: -20.0}
- {id: 2, x: 0.0, y: 2.0, z: -20.0}
- {id: 3, x: 0.0, y: 3.0, z: -20.0}
- {id: 4, x: 1.0, y: 0.0, z: -20.0}
- {id: 5, x: 1.0, y: 1.0, z: -20.0}
- {id: 6, x: 1.0, y: 2.0, z: -20.0}
- {id: 7, x: 1.0, y: 3.0, z: -20.0}
- {id: 8, x: 2.0, y: 0.0, z: -20.0}
- {id: 9, x: 2.0, y: 1.0, z: -20.0}
- {id: 10, x: 2.0, y: 2.0, z: -20.0}
- {id: 11, x: 2.0, y: 3.0, z: -20.0}
- {id: 12, x: 3.0, y: 0.0, z: -20.0}
- {id: 13, x: 3.0, y: 1.0, z: -20.0}
- {id: 14, x: 3.0, y: 2.0, z: -20.0}
- {id: 15, x: 3.0, y: 3.0, z: -20.0}
target:
# Car position: 200m north of origin, ground level
x: 200.0
y: 0.0
z: 0.0
+485
View File
@@ -0,0 +1,485 @@
#!/usr/bin/env python3
"""
Swarm Formation Designer - Create and preview formation YAML files for PX4 SIH swarm simulation.
Generates swarm_formation.yaml files with preset formation patterns (grid, circle, line,
v, diamond, random) for use with sih_swarm_run.sh and swarm_ros2 coordinator.
Usage examples:
# 16-drone grid with 2m spacing, preview in terminal
python3 swarm_formation_designer.py --pattern grid --count 16 --spacing 2.0 --preview
# 8-drone circle, radius 10m, write to file
python3 swarm_formation_designer.py --pattern circle --count 8 --radius 10 -o formation.yaml
# V formation with custom origin and target
python3 swarm_formation_designer.py --pattern v --count 7 --spacing 3.0 \\
--origin-lat 37.4 --origin-lon -122.0 --target-north 150 --target-east 50
"""
import argparse
import math
import random
import sys
from typing import List, Tuple
# Defaults matching PX4 SIH test site (Zurich area)
DEFAULT_LAT = 47.397742
DEFAULT_LON = 8.545594
DEFAULT_ALT = 488.0
DEFAULT_ALTITUDE = 20.0
DEFAULT_SPACING = 1.0
DEFAULT_RADIUS = 5.0
DEFAULT_COUNT = 16
DEFAULT_TARGET_NORTH = 200.0
DEFAULT_TARGET_EAST = 0.0
MIN_SEPARATION = 0.5 # meters, minimum allowed distance between any two drones
def generate_grid(count: int, spacing: float, altitude: float) -> List[dict]:
"""Arrange drones in a square grid pattern (ceil(sqrt(count)) columns)."""
cols = math.ceil(math.sqrt(count))
positions = []
for i in range(count):
row = i // cols
col = i % cols
positions.append({
"id": i,
"x": round(row * spacing, 2),
"y": round(col * spacing, 2),
"z": round(-altitude, 2),
})
return positions
def generate_circle(count: int, radius: float, altitude: float) -> List[dict]:
"""Arrange drones evenly spaced around a circle."""
positions = []
for i in range(count):
angle = 2.0 * math.pi * i / count
positions.append({
"id": i,
"x": round(radius * math.cos(angle), 2),
"y": round(radius * math.sin(angle), 2),
"z": round(-altitude, 2),
})
return positions
def generate_line(count: int, spacing: float, altitude: float) -> List[dict]:
"""Arrange drones in a line along the east (y) axis, centered at origin."""
positions = []
offset = (count - 1) * spacing / 2.0
for i in range(count):
positions.append({
"id": i,
"x": 0.0,
"y": round(i * spacing - offset, 2),
"z": round(-altitude, 2),
})
return positions
def generate_v(count: int, spacing: float, altitude: float) -> List[dict]:
"""Classic V formation with leader at front (northernmost position).
Leader is id 0 at the tip. Remaining drones alternate left/right,
each row stepping back (south) and outward (east/west).
"""
positions = [{"id": 0, "x": 0.0, "y": 0.0, "z": round(-altitude, 2)}]
wing_index = 1
for i in range(1, count):
side = 1 if (i % 2 == 1) else -1
row = (i + 1) // 2
positions.append({
"id": wing_index,
"x": round(-row * spacing, 2), # step back (south)
"y": round(side * row * spacing, 2), # spread east/west
"z": round(-altitude, 2),
})
wing_index += 1
return positions
def generate_diamond(count: int, spacing: float, altitude: float) -> List[dict]:
"""Diamond/rhombus formation.
Builds concentric diamond rings outward from the center. Ring k has 4*k
positions (except the center which has 1). Drones fill from center outward.
"""
# Generate diamond coordinates ring by ring
coords: List[Tuple[float, float]] = [(0.0, 0.0)]
ring = 1
while len(coords) < count:
# Each ring has 4 sides, each side has 'ring' segments
for side in range(4):
for step in range(ring):
if side == 0: # top-right edge
x = (ring - step) * spacing
y = step * spacing
elif side == 1: # bottom-right edge
x = -step * spacing
y = (ring - step) * spacing
elif side == 2: # bottom-left edge
x = -(ring - step) * spacing
y = -step * spacing
else: # top-left edge
x = step * spacing
y = -(ring - step) * spacing
coords.append((round(x, 2), round(y, 2)))
if len(coords) >= count:
break
if len(coords) >= count:
break
ring += 1
positions = []
for i in range(count):
positions.append({
"id": i,
"x": coords[i][0],
"y": coords[i][1],
"z": round(-altitude, 2),
})
return positions
def generate_random(count: int, spacing: float, altitude: float) -> List[dict]:
"""Random positions within a bounding box, enforcing minimum separation.
The bounding box size scales with count and spacing. Uses rejection
sampling to guarantee minimum separation between all drones.
"""
box_size = max(spacing * math.sqrt(count) * 2, spacing * 4)
positions = []
coords: List[Tuple[float, float]] = []
max_attempts = count * 1000
rng = random.Random(42) # deterministic for reproducibility
attempts = 0
while len(coords) < count and attempts < max_attempts:
x = round(rng.uniform(-box_size / 2, box_size / 2), 2)
y = round(rng.uniform(-box_size / 2, box_size / 2), 2)
too_close = False
for cx, cy in coords:
if math.sqrt((x - cx) ** 2 + (y - cy) ** 2) < MIN_SEPARATION:
too_close = True
break
if not too_close:
coords.append((x, y))
attempts += 1
if len(coords) < count:
print(
f"Warning: could only place {len(coords)}/{count} drones "
f"with minimum separation {MIN_SEPARATION}m in bounding box {box_size:.1f}m",
file=sys.stderr,
)
for i, (x, y) in enumerate(coords):
positions.append({
"id": i,
"x": x,
"y": y,
"z": round(-altitude, 2),
})
return positions
PATTERN_GENERATORS = {
"grid": generate_grid,
"circle": generate_circle,
"line": generate_line,
"v": generate_v,
"diamond": generate_diamond,
"random": generate_random,
}
def validate_no_overlap(positions: List[dict]) -> bool:
"""Check that no two drones occupy the same horizontal position (within MIN_SEPARATION)."""
for i in range(len(positions)):
for j in range(i + 1, len(positions)):
dx = positions[i]["x"] - positions[j]["x"]
dy = positions[i]["y"] - positions[j]["y"]
dist = math.sqrt(dx * dx + dy * dy)
if dist < MIN_SEPARATION:
print(
f"Error: drones {positions[i]['id']} and {positions[j]['id']} "
f"are only {dist:.2f}m apart (minimum: {MIN_SEPARATION}m)",
file=sys.stderr,
)
return False
return True
def build_formation_dict(
positions: List[dict],
origin_lat: float,
origin_lon: float,
origin_alt: float,
target_north: float,
target_east: float,
) -> dict:
"""Build the formation dictionary matching swarm_formation.yaml structure."""
return {
"origin": {
"lat": origin_lat,
"lon": origin_lon,
"alt": origin_alt,
},
"formation": positions,
"target": {
"x": target_north,
"y": target_east,
"z": 0.0,
},
}
def format_yaml(data: dict) -> str:
"""Format the formation data as YAML with flow-style formation entries.
Produces output matching the established swarm_formation.yaml style with
inline dicts for formation entries and block style for origin/target.
"""
lines = [
"# Formation config for SIH swarm simulation",
"# Generated by swarm_formation_designer.py",
"",
"origin:",
f" lat: {data['origin']['lat']}",
f" lon: {data['origin']['lon']}",
f" alt: {data['origin']['alt']}",
"",
"formation:",
]
# Determine padding width for aligned id field
max_id = max(p["id"] for p in data["formation"])
id_width = len(str(max_id))
for pos in data["formation"]:
id_str = str(pos["id"]).rjust(id_width)
lines.append(
f" - {{id: {id_str}, "
f"x: {pos['x']}, "
f"y: {pos['y']}, "
f"z: {pos['z']}}}"
)
lines.extend([
"",
"target:",
f" x: {data['target']['x']}",
f" y: {data['target']['y']}",
f" z: {data['target']['z']}",
"",
])
return "\n".join(lines)
def ascii_preview(positions: List[dict], target_north: float, target_east: float) -> str:
"""Render an ASCII top-down map showing drone positions and target.
North is up, East is right. Axes are labeled. The target is shown as 'T'
and drones as their id number (or '*' if id >= 10).
"""
width = 60
height = 30
# Collect all points (drones + target)
all_x = [p["x"] for p in positions] + [target_north]
all_y = [p["y"] for p in positions] + [target_east]
min_x, max_x = min(all_x), max(all_x)
min_y, max_y = min(all_y), max(all_y)
# Add margin
range_x = max_x - min_x if max_x != min_x else 1.0
range_y = max_y - min_y if max_y != min_y else 1.0
margin = 0.1
min_x -= range_x * margin
max_x += range_x * margin
min_y -= range_y * margin
max_y += range_y * margin
range_x = max_x - min_x
range_y = max_y - min_y
def to_grid(north: float, east: float) -> Tuple[int, int]:
col = int((east - min_y) / range_y * (width - 1))
row = int((max_x - north) / range_x * (height - 1)) # north is up
col = max(0, min(width - 1, col))
row = max(0, min(height - 1, row))
return row, col
# Initialize grid
grid = [[" " for _ in range(width)] for _ in range(height)]
# Place target
tr, tc = to_grid(target_north, target_east)
grid[tr][tc] = "T"
# Place drones (overwrite target if on same cell)
for pos in positions:
r, c = to_grid(pos["x"], pos["y"])
label = str(pos["id"]) if pos["id"] < 10 else "*"
grid[r][c] = label
# Build output
out = []
out.append(f" Formation Preview ({len(positions)} drones)")
out.append(f" North ^ (x range: {min_x:.1f} to {max_x:.1f} m)")
out.append(" " + "-" * (width + 2))
for row in grid:
out.append(" |" + "".join(row) + "|")
out.append(" " + "-" * (width + 2))
out.append(f" East -> (y range: {min_y:.1f} to {max_y:.1f} m)")
out.append("")
out.append(" Legend: 0-9 = drone id, * = drone id >= 10, T = target")
return "\n".join(out)
def parse_args(argv: "List[str] | None" = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Create and preview swarm formation YAML files for PX4 SIH simulation.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" %(prog)s --pattern grid --count 16 --spacing 2 --preview\n"
" %(prog)s --pattern circle --count 8 --radius 10 -o my_formation.yaml\n"
" %(prog)s --pattern v --count 7 --spacing 3 --preview -o v_formation.yaml\n"
),
)
parser.add_argument(
"--pattern",
choices=list(PATTERN_GENERATORS.keys()),
default="grid",
help="Formation pattern (default: grid)",
)
parser.add_argument(
"--count",
type=int,
default=DEFAULT_COUNT,
help=f"Number of drones (default: {DEFAULT_COUNT})",
)
parser.add_argument(
"--spacing",
type=float,
default=DEFAULT_SPACING,
help=f"Spacing between drones in meters (default: {DEFAULT_SPACING})",
)
parser.add_argument(
"--altitude",
type=float,
default=DEFAULT_ALTITUDE,
help=f"Flight altitude in meters AGL (default: {DEFAULT_ALTITUDE})",
)
parser.add_argument(
"--radius",
type=float,
default=DEFAULT_RADIUS,
help=f"Radius for circle pattern in meters (default: {DEFAULT_RADIUS})",
)
parser.add_argument(
"--origin-lat",
type=float,
default=DEFAULT_LAT,
help=f"Origin latitude (default: {DEFAULT_LAT})",
)
parser.add_argument(
"--origin-lon",
type=float,
default=DEFAULT_LON,
help=f"Origin longitude (default: {DEFAULT_LON})",
)
parser.add_argument(
"--origin-alt",
type=float,
default=DEFAULT_ALT,
help=f"Origin altitude AMSL in meters (default: {DEFAULT_ALT})",
)
parser.add_argument(
"--target-north",
type=float,
default=DEFAULT_TARGET_NORTH,
help=f"Target position north of origin in meters (default: {DEFAULT_TARGET_NORTH})",
)
parser.add_argument(
"--target-east",
type=float,
default=DEFAULT_TARGET_EAST,
help=f"Target position east of origin in meters (default: {DEFAULT_TARGET_EAST})",
)
parser.add_argument(
"--preview",
action="store_true",
help="Show ASCII preview of the formation",
)
parser.add_argument(
"-o", "--output",
type=str,
default=None,
help="Output YAML file path (default: print to stdout)",
)
args = parser.parse_args(argv)
if args.count < 1:
parser.error("--count must be at least 1")
if args.spacing <= 0:
parser.error("--spacing must be positive")
if args.altitude <= 0:
parser.error("--altitude must be positive")
if args.radius <= 0:
parser.error("--radius must be positive")
return args
def main(argv: "List[str] | None" = None) -> int:
args = parse_args(argv)
# Select generator and dispatch with appropriate arguments
generator = PATTERN_GENERATORS[args.pattern]
if args.pattern == "circle":
positions = generator(args.count, args.radius, args.altitude)
else:
positions = generator(args.count, args.spacing, args.altitude)
# Validate
if not validate_no_overlap(positions):
return 1
# Build output
data = build_formation_dict(
positions,
args.origin_lat,
args.origin_lon,
args.origin_alt,
args.target_north,
args.target_east,
)
yaml_text = format_yaml(data)
# Preview
if args.preview:
print(ascii_preview(positions, args.target_north, args.target_east))
print()
# Output
if args.output:
with open(args.output, "w") as f:
f.write(yaml_text)
print(f"Wrote {len(positions)} drone formation to {args.output}")
else:
if not args.preview:
print(yaml_text, end="")
else:
print(yaml_text, end="")
return 0
if __name__ == "__main__":
sys.exit(main())
+2
View File
@@ -0,0 +1,2 @@
# Staging directory created by build_docker.sh (ephemeral)
_px4_msgs_defs/
+95
View File
@@ -0,0 +1,95 @@
# ROS 2 Jazzy layer on top of PX4 dev environment
# For swarm simulation with px4-ros2-interface-lib
#
# Build (from PX4-Autopilot root):
# Tools/simulation/swarm_ros2/build_docker.sh
#
# Run:
# docker run -it --rm -v /path/to/PX4-Autopilot:/px4 px4-swarm-ros2
#
FROM px4io/px4-dev:v1.17.0-rc1
LABEL maintainer="Ramon Roche <mrpollo@gmail.com>"
ENV DEBIAN_FRONTEND=noninteractive
ENV ROS_DISTRO=jazzy
# Add ROS 2 Jazzy repository (Ubuntu 24.04 = noble)
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
gnupg \
lsb-release \
software-properties-common \
&& curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key \
-o /usr/share/keyrings/ros-archive-keyring.gpg \
&& echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] \
http://packages.ros.org/ros2/ubuntu noble main" \
> /etc/apt/sources.list.d/ros2.list \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ros-jazzy-ros-base \
ros-jazzy-rmw-cyclonedds-cpp \
python3-colcon-common-extensions \
python3-rosdep \
python3-numpy \
&& rm -rf /var/lib/apt/lists/*
# Initialize rosdep
RUN rosdep init 2>/dev/null || true \
&& su user -c "rosdep update --rosdistro jazzy" 2>/dev/null || true
# Set ROS 2 middleware to CycloneDDS (better multi-vehicle support)
ENV RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
# Build Micro XRCE-DDS Agent from source
RUN git clone --depth 1 https://github.com/eProsima/Micro-XRCE-DDS-Agent.git /tmp/xrce-agent \
&& cd /tmp/xrce-agent \
&& mkdir build && cd build \
&& cmake .. -DCMAKE_BUILD_TYPE=Release \
&& make -j$(nproc) \
&& make install \
&& ldconfig \
&& rm -rf /tmp/xrce-agent
# Build px4_msgs from PX4 source tree msg definitions (ensures type hash match).
# The build_docker.sh script copies msg/ and srv/ into _px4_msgs_defs/ before build.
ENV ROS2_WS=/opt/ros2_ws
COPY _px4_msgs_defs/ /tmp/px4_msg_defs/
RUN mkdir -p ${ROS2_WS}/src \
&& cd ${ROS2_WS}/src \
&& git clone --depth 1 https://github.com/PX4/px4_msgs.git \
&& rm -rf px4_msgs/msg/*.msg px4_msgs/srv/*.srv \
&& cp /tmp/px4_msg_defs/msg/*.msg px4_msgs/msg/ \
&& cp /tmp/px4_msg_defs/srv/*.srv px4_msgs/srv/ \
&& rm -rf /tmp/px4_msg_defs \
&& cd ${ROS2_WS} \
&& . /opt/ros/jazzy/setup.sh \
&& colcon build --packages-select px4_msgs --cmake-args -DCMAKE_BUILD_TYPE=Release \
&& rm -rf log
# Build px4-ros2-interface-lib (depends on px4_msgs built above)
RUN cd ${ROS2_WS}/src \
&& git clone --depth 1 --recursive https://github.com/Auterion/px4-ros2-interface-lib.git \
&& cd ${ROS2_WS} \
&& . /opt/ros/jazzy/setup.sh \
&& . install/setup.sh \
&& colcon build --packages-skip px4_msgs --cmake-args -DCMAKE_BUILD_TYPE=Release \
&& rm -rf log src
# Copy swarm_ros2 package and build it
COPY . ${ROS2_WS}/src/swarm_ros2/
RUN cd ${ROS2_WS} \
&& . /opt/ros/jazzy/setup.sh \
&& . install/setup.sh \
&& colcon build --packages-select swarm_ros2 \
&& rm -rf log
# Source everything on shell entry
RUN echo "source /opt/ros/jazzy/setup.bash" >> /etc/bash.bashrc \
&& echo "source ${ROS2_WS}/install/setup.bash" >> /etc/bash.bashrc
# XRCE-DDS UDP port
EXPOSE 8888/udp
WORKDIR ${ROS2_WS}
CMD ["/bin/bash"]
+35
View File
@@ -0,0 +1,35 @@
#!/bin/bash
# Build the px4-swarm-ros2 Docker image with px4_msgs matching this PX4 commit.
# Run from anywhere; the script locates PX4 root automatically.
#
# Usage: ./build_docker.sh
set -eo pipefail
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
PX4_ROOT="$( cd "${SCRIPT_DIR}/../../.." && pwd )"
echo "PX4 root: ${PX4_ROOT}"
echo "Staging px4_msgs definitions from this PX4 commit..."
# Stage msg/srv files into the Docker build context
STAGING_DIR="${SCRIPT_DIR}/_px4_msgs_defs"
rm -rf "${STAGING_DIR}"
mkdir -p "${STAGING_DIR}/msg" "${STAGING_DIR}/srv"
cp "${PX4_ROOT}"/msg/*.msg "${STAGING_DIR}/msg/"
cp "${PX4_ROOT}"/msg/versioned/*.msg "${STAGING_DIR}/msg/"
cp "${PX4_ROOT}"/srv/*.srv "${STAGING_DIR}/srv/"
echo "Staged $(ls "${STAGING_DIR}/msg/" | wc -l | tr -d ' ') .msg files and $(ls "${STAGING_DIR}/srv/" | wc -l | tr -d ' ') .srv files"
echo ""
echo "Building Docker image..."
docker build -t px4-swarm-ros2 "${SCRIPT_DIR}"
# Clean up staged files
rm -rf "${STAGING_DIR}"
echo ""
echo "Done. Run with:"
echo " docker run -it --rm -v ${PX4_ROOT}:/px4 px4-swarm-ros2 bash /px4/Tools/simulation/swarm_ros2/run_swarm.sh 4"
@@ -0,0 +1,80 @@
"""
Launch file for the PX4 swarm coordination stack.
Starts one vision_provider node per drone and a single swarm_controller node.
Usage:
ros2 launch swarm_ros2 swarm_launch.py num_drones:=16 speed_factor:=1.0
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
# Declare launch arguments.
num_drones_arg = DeclareLaunchArgument(
'num_drones',
default_value='16',
description='Number of drones in the swarm',
)
speed_factor_arg = DeclareLaunchArgument(
'speed_factor',
default_value='1.0',
description='Simulation speed factor',
)
formation_config_arg = DeclareLaunchArgument(
'formation_config',
default_value='',
description='Path to YAML formation configuration file',
)
num_drones = LaunchConfiguration('num_drones')
speed_factor = LaunchConfiguration('speed_factor')
# We need a concrete integer to iterate, so we support a fixed max and
# conditionally launch. ROS 2 launch does not natively support dynamic
# loops over LaunchConfiguration integers, so we generate nodes for a
# reasonable upper bound and rely on the controller's num_drones param
# to limit actual usage. For simplicity, generate exactly 16 nodes
# (the default). For other counts, users can call this launch file
# programmatically or adjust the constant below.
MAX_DRONES = 16
vision_nodes = []
for i in range(MAX_DRONES):
ns = f'px4_{i}'
vision_nodes.append(
Node(
package='swarm_ros2',
executable='vision_provider',
name=f'vision_provider_{i}',
parameters=[{
'namespace': ns,
'instance_id': i,
}],
output='screen',
)
)
# Single swarm controller.
controller_node = Node(
package='swarm_ros2',
executable='swarm_controller',
name='swarm_controller',
parameters=[{
'num_drones': num_drones,
'speed_factor': speed_factor,
}],
output='screen',
)
return LaunchDescription([
num_drones_arg,
speed_factor_arg,
formation_config_arg,
*vision_nodes,
controller_node,
])
+24
View File
@@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>swarm_ros2</name>
<version>0.1.0</version>
<description>Swarm coordination for PX4 multi-vehicle simulation using px4-ros2-interface-lib concepts</description>
<maintainer email="mrpollo@gmail.com">Ramon Roche</maintainer>
<license>BSD-3-Clause</license>
<depend>rclpy</depend>
<depend>px4_msgs</depend>
<depend>px4_ros2_cpp</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
+99
View File
@@ -0,0 +1,99 @@
#!/bin/bash
# Run the full swarm test inside Docker: PX4 SIH + XRCE-DDS + ROS 2
# Usage: ./run_swarm.sh [NUM_DRONES] [SPEED_FACTOR]
#
# Must be run inside the px4-swarm-ros2 container with PX4 source mounted at /px4:
# docker run -it --rm -v /path/to/PX4-Autopilot:/px4 px4-swarm-ros2 /opt/ros2_ws/src/swarm_ros2/run_swarm.sh 4
set -eo pipefail
NUM_DRONES="${1:-4}"
SPEED_FACTOR="${2:-1}"
PX4_SRC="${PX4_SRC:-/px4}"
BUILD_DIR="${PX4_SRC}/build/px4_sitl_sih"
echo "=== PX4 SIH Swarm Test ==="
echo " Drones: $NUM_DRONES"
echo " Speed factor: $SPEED_FACTOR"
echo ""
# Check PX4 build exists
if [ ! -f "$BUILD_DIR/bin/px4" ]; then
echo "PX4 binary not found at $BUILD_DIR/bin/px4"
echo "Building PX4 (this may take a few minutes)..."
cd "$PX4_SRC"
make px4_sitl_sih
fi
# Source ROS 2
source /opt/ros/jazzy/setup.bash
source /opt/ros2_ws/install/setup.bash
# Cleanup handler
PIDS=()
cleanup() {
echo ""
echo "Shutting down..."
for pid in "${PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
pkill -x px4 2>/dev/null || true
pkill -x MicroXRCEAgent 2>/dev/null || true
wait 2>/dev/null
echo "Done."
}
trap cleanup EXIT SIGINT SIGTERM
# Kill any existing px4
pkill -x px4 2>/dev/null || true
sleep 1
# Start XRCE-DDS agent
echo "Starting XRCE-DDS agent on port 8888..."
MicroXRCEAgent udp4 -p 8888 > /dev/null 2>&1 &
PIDS+=($!)
sleep 1
# Launch PX4 instances
export PX4_SIM_MODEL=sihsim_quadx_vision
export PX4_SIM_SPEED_FACTOR="$SPEED_FACTOR"
# Clean stale instance data (old parameters.bson can override param set-default)
echo "Cleaning stale instance data..."
for n in $(seq 0 $((NUM_DRONES - 1))); do
rm -rf "$BUILD_DIR/instance_$n"
done
echo "Launching $NUM_DRONES PX4 SIH instances..."
for n in $(seq 0 $((NUM_DRONES - 1))); do
working_dir="$BUILD_DIR/instance_$n"
mkdir -p "$working_dir"
pushd "$working_dir" &>/dev/null
# Force consistent DDS namespace for all instances (including 0)
# so ROS 2 nodes can use px4_0, px4_1, ... uniformly
PX4_UXRCE_DDS_NS="px4_$n" "$BUILD_DIR/bin/px4" -i "$n" -d "$BUILD_DIR/etc" >out.log 2>err.log &
PIDS+=($!)
popd &>/dev/null
echo " Instance $n started (PID ${PIDS[-1]})"
done
# Wait for PX4 instances to initialize
echo ""
echo "Waiting 5s for PX4 instances to initialize..."
sleep 5
# Launch ROS 2 swarm (blocks until mission completes)
echo ""
echo "Launching ROS 2 swarm controller..."
echo "=========================================="
ros2 launch swarm_ros2 swarm_launch.py num_drones:="$NUM_DRONES"
echo ""
echo "=== Mission complete ==="
# Collect ULogs
echo "Collecting ULog files..."
python3 "$PX4_SRC/Tools/simulation/collect_swarm_ulogs.py" \
--build-dir "$BUILD_DIR" \
--output-dir "$PX4_SRC/swarm_ulogs_$(date +%Y%m%d_%H%M%S)"
+5
View File
@@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/swarm_ros2
[install]
install_scripts=$base/lib/swarm_ros2
+29
View File
@@ -0,0 +1,29 @@
from setuptools import setup
import os
from glob import glob
package_name = 'swarm_ros2'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Ramon Roche',
maintainer_email='mrpollo@gmail.com',
description='Swarm coordination for PX4 multi-vehicle simulation',
license='BSD-3-Clause',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'vision_provider = swarm_ros2.vision_provider:main',
'swarm_controller = swarm_ros2.swarm_controller:main',
],
},
)
@@ -0,0 +1,599 @@
"""
Swarm controller node for PX4 multi-vehicle coordination.
Manages all drones through a phased state machine:
1. Wait for EV lock
2. Arm + takeoff to formation
3. Hold grid formation
4. Transition to circle via cosine interpolation
5. Hold circle and rotate
6. Kamikaze dive to target
Uses PX4 offboard control via px4_msgs topics.
"""
import math
import os
from enum import IntEnum, auto
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
from px4_msgs.msg import (
OffboardControlMode,
TrajectorySetpoint,
VehicleCommand,
VehicleLocalPosition,
VehicleStatus,
)
PX4_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# Volatile QoS for subscriptions to avoid stale data from before
# namespace discovery is complete.
PX4_SUB_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
class Phase(IntEnum):
"""State machine phases."""
WAIT_EV_LOCK = auto()
ARM_TAKEOFF = auto()
HOLD_GRID = auto()
TRANSITION_CIRCLE = auto()
HOLD_CIRCLE = auto()
KAMIKAZE = auto()
DONE = auto()
class DroneState:
"""Per-drone bookkeeping."""
def __init__(self, drone_id: int, namespace: str):
self.drone_id = drone_id
self.namespace = namespace
# Telemetry
self.xy_valid = False
self.z_valid = False
self.position = np.zeros(3) # NED [x, y, z]
self.has_initial_pos = False
self._init_samples: list = [] # Buffer for initial position consensus
self.armed = False
self.nav_state = 0
# Formation target (grid)
self.grid_pos = np.zeros(3)
# Circle target
self.circle_angle = 0.0
# Mission status
self.impacted = False
class SwarmController(Node):
"""Coordinates N drones through the swarm mission phases."""
# Phase durations in seconds
EV_LOCK_TIMEOUT = 15.0
TAKEOFF_DURATION = 20.0
HOLD_GRID_DURATION = 10.0
TRANSITION_DURATION = 15.0
HOLD_CIRCLE_DURATION = 10.0
KAMIKAZE_TIMEOUT = 120.0
# Circle parameters
CIRCLE_RADIUS = 4.0 # metres
CIRCLE_SPEED = 3.0 # m/s tangential
# Kamikaze parameters
KAMIKAZE_SPEED = 12.0 # m/s (MPC_XY_VEL_MAX)
IMPACT_DIST = 2.0 # metres
IMPACT_Z_THRESH = -1.0 # NED (z > -1 means within 1m of ground)
# PX4 command constants
VEHICLE_CMD_COMPONENT_ARM_DISARM = 400
VEHICLE_CMD_DO_SET_MODE = 176
CONTROL_HZ = 20.0
def __init__(self):
super().__init__('swarm_controller')
# Parameters
self.declare_parameter('num_drones', 16)
self.declare_parameter('speed_factor', 1.0)
self.declare_parameter('target_x', 200.0)
self.declare_parameter('target_y', 0.0)
self.declare_parameter('target_z', 0.0)
self.declare_parameter('takeoff_altitude', -5.0) # NED, negative = up
self._num_drones = self.get_parameter('num_drones').value
self._speed_factor = self.get_parameter('speed_factor').value
# Kamikaze target (NED)
self._target = np.array([
float(os.environ.get('PX4_KAMIKAZE_X',
self.get_parameter('target_x').value)),
float(os.environ.get('PX4_KAMIKAZE_Y',
self.get_parameter('target_y').value)),
float(os.environ.get('PX4_KAMIKAZE_Z',
self.get_parameter('target_z').value)),
])
takeoff_alt = self.get_parameter('takeoff_altitude').value
self.get_logger().info(
f'Swarm controller: {self._num_drones} drones, '
f'speed_factor={self._speed_factor}, '
f'target={self._target.tolist()}'
)
# Build grid formation (4x4 or NxM, 2m spacing, centered at origin).
cols = int(math.ceil(math.sqrt(self._num_drones)))
rows = int(math.ceil(self._num_drones / cols))
grid_center_x = (cols - 1) / 2.0
grid_center_y = (rows - 1) / 2.0
# ------------------------------------------------------------------ #
# Per-drone state and ROS interfaces
# ------------------------------------------------------------------ #
self._drones: list[DroneState] = []
self._pub_offboard: list[rclpy.publisher.Publisher] = []
self._pub_traj: list[rclpy.publisher.Publisher] = []
self._pub_cmd: list[rclpy.publisher.Publisher] = []
for i in range(self._num_drones):
ns = f'px4_{i}'
drone = DroneState(i, ns)
# Grid position: 2m spacing, centered, at takeoff altitude.
col = i % cols
row = i // cols
drone.grid_pos = np.array([
(col - grid_center_x) * 2.0,
(row - grid_center_y) * 2.0,
float(takeoff_alt),
])
# Circle angle for this drone.
drone.circle_angle = 2.0 * math.pi * i / self._num_drones
self._drones.append(drone)
# Publishers
self._pub_offboard.append(
self.create_publisher(
OffboardControlMode,
f'/{ns}/fmu/in/offboard_control_mode',
PX4_QOS,
)
)
self._pub_traj.append(
self.create_publisher(
TrajectorySetpoint,
f'/{ns}/fmu/in/trajectory_setpoint',
PX4_QOS,
)
)
self._pub_cmd.append(
self.create_publisher(
VehicleCommand,
f'/{ns}/fmu/in/vehicle_command',
PX4_QOS,
)
)
# Subscribers
# PX4 XRCE-DDS uses versioned topic names: _v<MESSAGE_VERSION>
# VehicleLocalPosition: MESSAGE_VERSION=1 -> _v1
# VehicleStatus: MESSAGE_VERSION=2 -> _v2
self.create_subscription(
VehicleLocalPosition,
f'/{ns}/fmu/out/vehicle_local_position_v1',
lambda msg, idx=i: self._local_pos_cb(idx, msg),
PX4_SUB_QOS,
)
self.create_subscription(
VehicleStatus,
f'/{ns}/fmu/out/vehicle_status_v2',
lambda msg, idx=i: self._vehicle_status_cb(idx, msg),
PX4_SUB_QOS,
)
# ------------------------------------------------------------------ #
# State machine
# ------------------------------------------------------------------ #
self._phase = Phase.WAIT_EV_LOCK
self._phase_start_time = self.get_clock().now()
self._mission_start_time = self.get_clock().now()
self._circle_center = np.array([0.0, 0.0, float(takeoff_alt)])
self._last_ev_print = 0
# Main control timer
period_s = 1.0 / self.CONTROL_HZ
self._timer = self.create_timer(period_s, self._control_loop)
# ===================================================================== #
# Subscriber callbacks
# ===================================================================== #
def _local_pos_cb(self, idx: int, msg: VehicleLocalPosition):
d = self._drones[idx]
d.xy_valid = msg.xy_valid
d.z_valid = msg.z_valid
new_pos = np.array([msg.x, msg.y, msg.z])
if not d.has_initial_pos:
if not (d.xy_valid and d.z_valid):
return
# All drones start near origin. Reject samples > 20m from origin
# during initialization (DDS cross-talk from other instances).
if np.linalg.norm(new_pos) > 20.0:
return
# Require 5 consistent samples within 5m to establish initial pos.
d._init_samples.append(new_pos)
if len(d._init_samples) >= 5:
ref = d._init_samples[-1]
consistent = all(
np.linalg.norm(s - ref) < 5.0
for s in d._init_samples[-5:]
)
if consistent:
d.position = new_pos
d.has_initial_pos = True
else:
d._init_samples = d._init_samples[-1:]
return
# Guard against DDS cross-talk: reject position jumps > 50m/tick.
# At 50Hz and 12m/s max, real movement is < 0.3m per tick.
jump = np.linalg.norm(new_pos - d.position)
if jump > 50.0:
return # Discard cross-talk sample
d.position = new_pos
def _vehicle_status_cb(self, idx: int, msg: VehicleStatus):
d = self._drones[idx]
d.armed = (msg.arming_state == VehicleStatus.ARMING_STATE_ARMED)
d.nav_state = msg.nav_state
# ===================================================================== #
# Publishing helpers
# ===================================================================== #
def _publish_offboard_mode(self, idx: int, *, position: bool = False,
velocity: bool = False):
"""Send OffboardControlMode for drone idx."""
msg = OffboardControlMode()
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000)
msg.position = position
msg.velocity = velocity
msg.acceleration = False
msg.attitude = False
msg.body_rate = False
self._pub_offboard[idx].publish(msg)
def _publish_trajectory(self, idx: int, *,
position: list[float] | None = None,
velocity: list[float] | None = None,
yaw: float = float('nan'),
yawspeed: float = float('nan')):
"""Send TrajectorySetpoint for drone idx."""
msg = TrajectorySetpoint()
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000)
nan3 = [float('nan')] * 3
msg.position = list(position) if position is not None else nan3
msg.velocity = list(velocity) if velocity is not None else nan3
msg.yaw = yaw
msg.yawspeed = yawspeed
self._pub_traj[idx].publish(msg)
def _publish_vehicle_command(self, idx: int, command: int,
param1: float = 0.0, param2: float = 0.0,
param3: float = 0.0):
"""Send VehicleCommand for drone idx."""
msg = VehicleCommand()
msg.timestamp = int(self.get_clock().now().nanoseconds / 1000)
msg.command = command
msg.param1 = param1
msg.param2 = param2
msg.param3 = param3
msg.target_system = idx + 1 # MAV_SYS_ID
msg.target_component = 1
msg.source_system = 255 # GCS
msg.source_component = 0
msg.from_external = True
self._pub_cmd[idx].publish(msg)
def _arm(self, idx: int):
self._publish_vehicle_command(
idx, self.VEHICLE_CMD_COMPONENT_ARM_DISARM, param1=1.0)
def _disarm_force(self, idx: int):
self._publish_vehicle_command(
idx, self.VEHICLE_CMD_COMPONENT_ARM_DISARM,
param1=0.0, param2=21196.0)
def _set_offboard_mode(self, idx: int):
self._publish_vehicle_command(
idx, self.VEHICLE_CMD_DO_SET_MODE,
param1=1.0, param2=6.0, param3=0.0)
# ===================================================================== #
# Phase elapsed time helper
# ===================================================================== #
def _phase_elapsed(self) -> float:
"""Seconds since current phase started."""
return (self.get_clock().now() - self._phase_start_time).nanoseconds / 1e9
def _mission_elapsed(self) -> float:
"""Seconds since mission started."""
return (self.get_clock().now() - self._mission_start_time).nanoseconds / 1e9
def _advance_phase(self, new_phase: Phase):
self.get_logger().info(f'Phase transition: {self._phase.name} -> {new_phase.name}')
self._phase = new_phase
self._phase_start_time = self.get_clock().now()
# ===================================================================== #
# Main control loop (20 Hz)
# ===================================================================== #
def _control_loop(self):
if self._phase == Phase.DONE:
return
if self._phase == Phase.WAIT_EV_LOCK:
self._phase_wait_ev_lock()
elif self._phase == Phase.ARM_TAKEOFF:
self._phase_arm_takeoff()
elif self._phase == Phase.HOLD_GRID:
self._phase_hold_grid()
elif self._phase == Phase.TRANSITION_CIRCLE:
self._phase_transition_circle()
elif self._phase == Phase.HOLD_CIRCLE:
self._phase_hold_circle()
elif self._phase == Phase.KAMIKAZE:
self._phase_kamikaze()
# ------------------------------------------------------------------ #
# Phase 1: Wait for EV lock
# ------------------------------------------------------------------ #
def _phase_wait_ev_lock(self):
locked = sum(1 for d in self._drones if d.xy_valid and d.z_valid)
elapsed = self._phase_elapsed()
# Print status every second.
elapsed_int = int(elapsed)
if elapsed_int > self._last_ev_print:
self._last_ev_print = elapsed_int
self.get_logger().info(
f'{locked}/{self._num_drones} drones have EV lock '
f'({elapsed:.0f}s elapsed)')
if locked == self._num_drones or elapsed > self.EV_LOCK_TIMEOUT:
if locked < self._num_drones:
self.get_logger().warn(
f'EV lock timeout: only {locked}/{self._num_drones} locked')
self._advance_phase(Phase.ARM_TAKEOFF)
# ------------------------------------------------------------------ #
# Phase 2: Arm + takeoff
# ------------------------------------------------------------------ #
def _phase_arm_takeoff(self):
elapsed = self._phase_elapsed()
for i in range(self._num_drones):
d = self._drones[i]
# Keep sending offboard + setpoint before and after arming.
self._publish_offboard_mode(i, position=True)
self._publish_trajectory(
i,
position=d.grid_pos.tolist(),
yaw=0.0,
)
# Arm and set offboard mode repeatedly for the first few seconds.
if elapsed < 5.0:
self._set_offboard_mode(i)
self._arm(i)
# Log status every 5 seconds.
elapsed_int = int(elapsed)
if elapsed_int > 0 and elapsed_int % 5 == 0 and elapsed - elapsed_int < 0.1:
armed = sum(1 for d in self._drones if d.armed)
self.get_logger().info(
f'ARM_TAKEOFF {elapsed:.0f}s: {armed}/{self._num_drones} armed, '
f'positions: ' + ', '.join(
f'D{i}=[{d.position[2]:.1f}m]'
for i, d in enumerate(self._drones)
)
)
if elapsed > self.TAKEOFF_DURATION:
self._advance_phase(Phase.HOLD_GRID)
# ------------------------------------------------------------------ #
# Phase 3: Hold grid
# ------------------------------------------------------------------ #
def _phase_hold_grid(self):
for i in range(self._num_drones):
d = self._drones[i]
self._publish_offboard_mode(i, position=True)
self._publish_trajectory(
i,
position=d.grid_pos.tolist(),
yaw=0.0,
)
if self._phase_elapsed() > self.HOLD_GRID_DURATION:
# Compute circle center as the mean of grid positions.
mean_pos = np.mean([d.grid_pos for d in self._drones], axis=0)
self._circle_center = mean_pos.copy()
self._advance_phase(Phase.TRANSITION_CIRCLE)
# ------------------------------------------------------------------ #
# Phase 4: Transition to circle (cosine interpolation)
# ------------------------------------------------------------------ #
def _phase_transition_circle(self):
elapsed = self._phase_elapsed()
alpha = 0.5 - 0.5 * math.cos(math.pi * min(elapsed / self.TRANSITION_DURATION, 1.0))
for i in range(self._num_drones):
d = self._drones[i]
# Target circle position.
cx = self._circle_center[0] + self.CIRCLE_RADIUS * math.cos(d.circle_angle)
cy = self._circle_center[1] + self.CIRCLE_RADIUS * math.sin(d.circle_angle)
cz = self._circle_center[2]
circle_pos = np.array([cx, cy, cz])
# Interpolate between grid and circle.
target = (1.0 - alpha) * d.grid_pos + alpha * circle_pos
# Yaw facing center.
yaw = math.atan2(
self._circle_center[1] - target[1],
self._circle_center[0] - target[0],
)
self._publish_offboard_mode(i, position=True)
self._publish_trajectory(i, position=target.tolist(), yaw=yaw)
if elapsed > self.TRANSITION_DURATION:
self._advance_phase(Phase.HOLD_CIRCLE)
# ------------------------------------------------------------------ #
# Phase 5: Hold circle (rotate CW)
# ------------------------------------------------------------------ #
def _phase_hold_circle(self):
elapsed = self._phase_elapsed()
omega = self.CIRCLE_SPEED / self.CIRCLE_RADIUS # rad/s
for i in range(self._num_drones):
d = self._drones[i]
# CW rotation: subtract omega*t from angle.
angle = d.circle_angle - omega * elapsed
px = self._circle_center[0] + self.CIRCLE_RADIUS * math.cos(angle)
py = self._circle_center[1] + self.CIRCLE_RADIUS * math.sin(angle)
pz = self._circle_center[2]
# Yaw facing center.
yaw = math.atan2(
self._circle_center[1] - py,
self._circle_center[0] - px,
)
self._publish_offboard_mode(i, position=True)
self._publish_trajectory(
i,
position=[px, py, pz],
yaw=yaw,
)
if elapsed > self.HOLD_CIRCLE_DURATION:
self._advance_phase(Phase.KAMIKAZE)
# ------------------------------------------------------------------ #
# Phase 6: Kamikaze dive
# ------------------------------------------------------------------ #
def _phase_kamikaze(self):
all_done = True
elapsed = self._phase_elapsed()
for i in range(self._num_drones):
d = self._drones[i]
if d.impacted:
continue
all_done = False
# Direction to target.
to_target = self._target - d.position
dist = np.linalg.norm(to_target)
# Impact detection.
if dist < self.IMPACT_DIST and d.position[2] > self.IMPACT_Z_THRESH:
self.get_logger().info(
f'[Drone {i}] Impact! pos=[{d.position[0]:.1f}, '
f'{d.position[1]:.1f}, {d.position[2]:.1f}]')
d.impacted = True
self._disarm_force(i)
continue
# Velocity toward target.
if dist > 0.01:
direction = to_target / dist
else:
direction = np.array([1.0, 0.0, 0.0])
vel = (direction * self.KAMIKAZE_SPEED).tolist()
# Yaw toward target.
yaw = math.atan2(to_target[1], to_target[0])
self._publish_offboard_mode(i, velocity=True)
self._publish_trajectory(i, velocity=vel, yaw=yaw)
# Log kamikaze progress every 10 seconds.
elapsed_int = int(elapsed)
if elapsed_int > 0 and elapsed_int % 10 == 0 and elapsed - elapsed_int < 0.1:
for i, d in enumerate(self._drones):
if not d.impacted:
dist = np.linalg.norm(self._target - d.position)
self.get_logger().info(
f'KAMIKAZE {elapsed:.0f}s: D{i} pos=[{d.position[0]:.1f}, '
f'{d.position[1]:.1f}, {d.position[2]:.1f}] '
f'dist={dist:.1f}m armed={d.armed}')
if all_done or elapsed > self.KAMIKAZE_TIMEOUT:
self._print_summary()
self._advance_phase(Phase.DONE)
# Shutdown after a short delay to let final messages flush.
self.create_timer(1.0, lambda: self._shutdown())
def _print_summary(self):
impacted = sum(1 for d in self._drones if d.impacted)
total_time = self._mission_elapsed()
self.get_logger().info('=' * 50)
self.get_logger().info('SWARM MISSION SUMMARY')
self.get_logger().info(f' Drones: {self._num_drones}')
self.get_logger().info(f' Impacts: {impacted}/{self._num_drones}')
self.get_logger().info(f' Total time: {total_time:.1f}s')
self.get_logger().info('=' * 50)
def _shutdown(self):
self.get_logger().info('Swarm controller shutting down.')
self._timer.cancel()
raise SystemExit(0)
def main(args=None):
rclpy.init(args=args)
node = SwarmController()
try:
rclpy.spin(node)
except (KeyboardInterrupt, SystemExit):
node.get_logger().info('Swarm controller exiting.')
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()
@@ -0,0 +1,156 @@
"""
Vision provider node for PX4 EKF2 external vision fusion.
Subscribes to ground truth local position and publishes noisy visual odometry
estimates, simulating an external vision system (e.g., motion capture, VIO).
One instance runs per drone. The node decimates to 30 Hz and adds configurable
Gaussian noise to position and velocity.
"""
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
from px4_msgs.msg import VehicleLocalPosition as VehicleLocalPositionGroundtruth
from px4_msgs.msg import VehicleOdometry
# QoS profile matching PX4 uORB-to-DDS bridge defaults.
PX4_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
class VisionProvider(Node):
"""Publishes noisy VehicleOdometry derived from ground truth."""
# Noise standard deviations
POS_NOISE_STD = 0.5 # metres
ANG_VEL_NOISE_STD = 0.1 # rad/s (unused but kept for completeness)
# Output rate target
OUTPUT_HZ = 30.0
OUTPUT_PERIOD_US = int(1e6 / OUTPUT_HZ)
def __init__(self):
# Declare parameters before calling super().__init__ so they are
# available immediately.
super().__init__('vision_provider')
self.declare_parameter('namespace', 'px4_0')
self.declare_parameter('instance_id', 0)
self._namespace = self.get_parameter('namespace').value
self._instance_id = self.get_parameter('instance_id').value
self.get_logger().info(
f'Starting vision_provider_{self._instance_id} '
f'for namespace /{self._namespace}'
)
self.get_logger().warn(
'Note: vehicle_local_position_groundtruth is NOT published '
'over DDS by PX4. This node is a no-op when using '
'sensor_agp_sim (SENS_EN_AGPSIM=1) for position. '
'It will only produce output if EV fusion is enabled and '
'ground truth is published via a custom DDS bridge.'
)
self._last_pub_time_us: int = 0
self._msg_count: int = 0
self._rng = np.random.default_rng()
# -- Subscriber: ground truth local position --
# Note: PX4 SIH does not publish this topic over XRCE-DDS.
# With sensor_agp_sim, EKF2 gets position internally.
gt_topic = f'/{self._namespace}/fmu/out/vehicle_local_position_groundtruth'
self._sub_gt = self.create_subscription(
VehicleLocalPositionGroundtruth,
gt_topic,
self._gt_callback,
PX4_QOS,
)
# -- Publisher: visual odometry for EKF2 --
vo_topic = f'/{self._namespace}/fmu/in/vehicle_visual_odometry'
self._pub_vo = self.create_publisher(VehicleOdometry, vo_topic, PX4_QOS)
# --------------------------------------------------------------------- #
# Callback
# --------------------------------------------------------------------- #
def _gt_callback(self, msg: VehicleLocalPositionGroundtruth):
"""Decimate ground truth and publish noisy visual odometry."""
now_us = int(self.get_clock().now().nanoseconds / 1000)
# Rate-limit to OUTPUT_HZ.
if (now_us - self._last_pub_time_us) < self.OUTPUT_PERIOD_US:
return
self._last_pub_time_us = now_us
# Build VehicleOdometry message.
vo = VehicleOdometry()
vo.timestamp = now_us
vo.timestamp_sample = msg.timestamp
# Frames: NED
vo.pose_frame = VehicleOdometry.POSE_FRAME_NED
vo.velocity_frame = VehicleOdometry.VELOCITY_FRAME_NED
# Position with Gaussian noise.
noise_pos = self._rng.normal(0.0, self.POS_NOISE_STD, size=3).astype(np.float32)
vo.position = [
msg.x + float(noise_pos[0]),
msg.y + float(noise_pos[1]),
msg.z + float(noise_pos[2]),
]
# Quaternion (pass through, no noise on orientation).
vo.q = [msg.heading, 0.0, 0.0, 0.0]
# If ground truth provides a full quaternion, prefer that. The
# VehicleLocalPosition message only carries heading; construct a
# yaw-only quaternion.
half_yaw = msg.heading / 2.0
vo.q = [
float(np.cos(half_yaw)),
0.0,
0.0,
float(np.sin(half_yaw)),
]
# Velocity with small noise.
noise_vel = self._rng.normal(0.0, 0.1, size=3).astype(np.float32)
vo.velocity = [
msg.vx + float(noise_vel[0]),
msg.vy + float(noise_vel[1]),
msg.vz + float(noise_vel[2]),
]
# Angular velocity (not used, set to zero).
vo.angular_velocity = [0.0, 0.0, 0.0]
# Variances (squared standard deviations).
vo.position_variance = [0.25, 0.25, 0.25] # 0.5^2
vo.orientation_variance = [0.01, 0.01, 0.01]
vo.velocity_variance = [0.1, 0.1, 0.1]
self._pub_vo.publish(vo)
def main(args=None):
rclpy.init(args=args)
node = VisionProvider()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down vision provider.')
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()
+86
View File
@@ -0,0 +1,86 @@
CONFIG_PLATFORM_POSIX=y
CONFIG_BOARD_TESTING=y
CONFIG_BOARD_ETHERNET=y
CONFIG_BOARD_ROOT_PATH="."
CONFIG_DRIVERS_CAMERA_TRIGGER=y
CONFIG_DRIVERS_GPS=y
CONFIG_DRIVERS_GNSS_SEPTENTRIO=y
CONFIG_DRIVERS_OSD_MSP_OSD=y
CONFIG_DRIVERS_TONE_ALARM=y
CONFIG_MODULES_AIRSHIP_ATT_CONTROL=y
CONFIG_MODULES_AIRSPEED_SELECTOR=y
CONFIG_MODULES_ATTITUDE_ESTIMATOR_Q=y
CONFIG_MODULES_CAMERA_FEEDBACK=y
CONFIG_MODULES_COMMANDER=y
CONFIG_MODULES_CONTROL_ALLOCATOR=y
CONFIG_MODULES_DATAMAN=y
CONFIG_MODULES_EKF2=y
CONFIG_EKF2_VERBOSE_STATUS=y
CONFIG_MODULES_EVENTS=y
CONFIG_MODULES_FLIGHT_MODE_MANAGER=y
CONFIG_MODULES_FW_ATT_CONTROL=y
CONFIG_MODULES_FW_AUTOTUNE_ATTITUDE_CONTROL=y
CONFIG_MODULES_FW_MODE_MANAGER=y
CONFIG_MODULES_FW_LATERAL_LONGITUDINAL_CONTROL=y
CONFIG_FIGURE_OF_EIGHT=y
CONFIG_MODULES_FW_RATE_CONTROL=y
CONFIG_MODULES_GIMBAL=y
CONFIG_MODULES_GYRO_CALIBRATION=y
CONFIG_MODULES_GYRO_FFT=y
CONFIG_MODULES_LAND_DETECTOR=y
CONFIG_MODULES_LANDING_TARGET_ESTIMATOR=y
CONFIG_MODULES_LOAD_MON=y
CONFIG_MODULES_LOCAL_POSITION_ESTIMATOR=y
CONFIG_MODULES_LOGGER=y
CONFIG_MODULES_MAG_BIAS_ESTIMATOR=y
CONFIG_MODULES_MANUAL_CONTROL=y
CONFIG_MODULES_MAVLINK=y
CONFIG_MODULES_MC_ATT_CONTROL=y
CONFIG_MODULES_MC_AUTOTUNE_ATTITUDE_CONTROL=y
CONFIG_MODULES_MC_HOVER_THRUST_ESTIMATOR=y
CONFIG_MODULES_MC_POS_CONTROL=y
CONFIG_MODULES_MC_RATE_CONTROL=y
CONFIG_MODULES_NAVIGATOR=y
CONFIG_MODE_NAVIGATOR_VTOL_TAKEOFF=y
CONFIG_NUM_MISSION_ITMES_SUPPORTED=10000
CONFIG_MODULES_PAYLOAD_DELIVERER=y
CONFIG_MODULES_RC_UPDATE=y
CONFIG_MODULES_REPLAY=y
CONFIG_MODULES_ROVER_ACKERMANN=y
CONFIG_MODULES_ROVER_DIFFERENTIAL=y
CONFIG_MODULES_ROVER_MECANUM=y
CONFIG_MODULES_SENSORS=y
CONFIG_COMMON_SIMULATION=y
CONFIG_MODULES_SIMULATION_GZ_MSGS=n
CONFIG_MODULES_SIMULATION_GZ_BRIDGE=n
CONFIG_MODULES_SIMULATION_GZ_PLUGINS=n
CONFIG_MODULES_SIMULATION_SENSOR_AGP_SIM=y
CONFIG_MODULES_TEMPERATURE_COMPENSATION=y
CONFIG_MODULES_UUV_ATT_CONTROL=y
CONFIG_MODULES_UUV_POS_CONTROL=y
CONFIG_MODULES_UXRCE_DDS_CLIENT=y
CONFIG_MODULES_VTOL_ATT_CONTROL=y
CONFIG_SYSTEMCMDS_ACTUATOR_TEST=y
CONFIG_SYSTEMCMDS_BSONDUMP=y
CONFIG_SYSTEMCMDS_DYN=y
CONFIG_SYSTEMCMDS_FAILURE=y
CONFIG_SYSTEMCMDS_LED_CONTROL=y
CONFIG_SYSTEMCMDS_PARAM=y
CONFIG_SYSTEMCMDS_PERF=y
CONFIG_SYSTEMCMDS_SD_BENCH=y
CONFIG_SYSTEMCMDS_SHUTDOWN=y
CONFIG_SYSTEMCMDS_SYSTEM_TIME=y
CONFIG_SYSTEMCMDS_TOPIC_LISTENER=y
CONFIG_SYSTEMCMDS_TUNE_CONTROL=y
CONFIG_SYSTEMCMDS_UORB=y
CONFIG_SYSTEMCMDS_VER=y
CONFIG_SYSTEMCMDS_WORK_QUEUE=y
CONFIG_EXAMPLES_DYN_HELLO=y
CONFIG_EXAMPLES_FAKE_GPS=y
CONFIG_EXAMPLES_FAKE_IMU=y
CONFIG_EXAMPLES_FAKE_MAGNETOMETER=y
CONFIG_EXAMPLES_HELLO=y
CONFIG_EXAMPLES_PX4_MAVLINK_DEBUG=y
CONFIG_EXAMPLES_PX4_SIMPLE_APP=y
CONFIG_EXAMPLES_WORK_ITEM=y
CONFIG_MODULES_SPACECRAFT=n
@@ -52,6 +52,7 @@ if(PX4_PLATFORM MATCHES "posix")
set(models
airplane
quadx
quadx_vision
xvert
standard_vtol
hex