feat(tools): add DroneCAN sensor latency calibration scripts

Cross-correlation scripts to measure CAN transport delay from flight
logs. Compares CAN sensor signals against the FC's directly-connected
IMU to estimate the total pipeline latency.

- range_sensor_latency.py: cross-correlates range rate with IMU-derived
  vertical velocity
- flow_sensor_latency.py: cross-correlates flow sensor gyro with FC gyro
This commit is contained in:
Jacob Dahl 2026-04-01 11:55:13 -08:00
parent 197a1a6214
commit 2c8a3e9c43
2 changed files with 927 additions and 0 deletions

View File

@ -0,0 +1,402 @@
#!/usr/bin/env python3
"""
Optical flow CAN transport latency analysis.
Estimates the pipeline delay from the flow sensor measurement to the FC-side
sensor_optical_flow timestamp by cross-correlating the flow sensor's onboard
gyro (delta_angle) with the FC's own gyro (sensor_combined).
The flow sensor's delta_angle is an integrated gyro over each measurement
interval. Dividing by integration_timespan gives the average angular rate,
which should match the FC's gyro with only a CAN transport delay offset.
Usage:
python3 flow_sensor_latency.py <log.ulg> [--output-dir <dir>]
"""
import argparse
import os
import sys
import numpy as np
from scipy import signal as sig
try:
from pyulog import ULog
except ImportError:
print("Error: pyulog not installed. Run: pip install pyulog", file=sys.stderr)
sys.exit(1)
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
except ImportError:
print("Error: matplotlib not installed. Run: pip install matplotlib", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# ULog helpers
# ---------------------------------------------------------------------------
def get_topic(ulog, name, multi_id=0):
for d in ulog.data_list:
if d.name == name and d.multi_id == multi_id:
return d
return None
def us_to_s(ts_us, start_us):
return (ts_us.astype(np.int64) - np.int64(start_us)) / 1e6
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def butter_highpass(cutoff_hz, fs, order=2):
nyq = fs / 2.0
return sig.butter(order, cutoff_hz / nyq, btype="high")
def parabolic_peak(cc, lags_s):
"""Refine cross-correlation peak with parabolic interpolation."""
idx = np.argmax(np.abs(cc))
if idx == 0 or idx == len(cc) - 1:
return lags_s[idx], cc[idx]
y0, y1, y2 = np.abs(cc[idx - 1]), np.abs(cc[idx]), np.abs(cc[idx + 1])
dt = lags_s[1] - lags_s[0]
denom = y0 - 2 * y1 + y2
if abs(denom) < 1e-12:
return lags_s[idx], cc[idx]
offset = 0.5 * (y0 - y2) / denom
return lags_s[idx] + offset * dt, cc[idx]
# ---------------------------------------------------------------------------
# Data extraction
# ---------------------------------------------------------------------------
def extract_flow_gyro(ulog):
"""Extract angular rate from the flow sensor's onboard gyro."""
d = get_topic(ulog, "sensor_optical_flow")
if d is None:
return None
t0 = ulog.start_timestamp
t = us_to_s(d.data["timestamp_sample"], t0)
dt_us = d.data["integration_timespan_us"].astype(np.float64)
quality = d.data["quality"]
# Convert integrated delta_angle to angular rate (rad/s)
valid = (quality > 0) & (dt_us > 100)
t = t[valid]
dt_s = dt_us[valid] / 1e6
rate_x = d.data["delta_angle[0]"][valid] / dt_s
rate_y = d.data["delta_angle[1]"][valid] / dt_s
fs = 1.0 / np.median(np.diff(t))
return {"time_s": t, "rate_x": rate_x, "rate_y": rate_y, "fs": fs}
def extract_fc_gyro(ulog):
"""Extract FC gyro rate from sensor_combined (highest rate available)."""
d = get_topic(ulog, "sensor_combined")
if d is None:
return None
t0 = ulog.start_timestamp
t = us_to_s(d.data["timestamp"], t0)
gx = d.data["gyro_rad[0]"]
gy = d.data["gyro_rad[1]"]
fs = 1.0 / np.median(np.diff(t))
return {"time_s": t, "rate_x": gx, "rate_y": gy, "fs": fs}
# ---------------------------------------------------------------------------
# Cross-correlation
# ---------------------------------------------------------------------------
def cross_correlate_delay(ref_time, ref_signal, test_time, test_signal,
max_lag_s=0.1, grid_fs=None):
"""
Cross-correlate two signals on a common time grid.
Positive lag = test is delayed relative to ref.
"""
t_start = max(ref_time[0], test_time[0])
t_end = min(ref_time[-1], test_time[-1])
if t_end - t_start < 1.0:
return None
if grid_fs is None:
grid_fs = max(1.0 / np.median(np.diff(ref_time)),
1.0 / np.median(np.diff(test_time)))
grid_fs = max(grid_fs, 1000.0)
t_grid = np.arange(t_start, t_end, 1.0 / grid_fs)
ref_i = np.interp(t_grid, ref_time, ref_signal)
test_i = np.interp(t_grid, test_time, test_signal)
# HP filter to remove bias
b_hp, a_hp = butter_highpass(0.5, grid_fs, order=2)
ref_i = sig.filtfilt(b_hp, a_hp, ref_i)
test_i = sig.filtfilt(b_hp, a_hp, test_i)
max_lag_samples = int(max_lag_s * grid_fs)
# np.correlate(a, b)[mid + k] peaks at k = -D when b is delayed by D
cc_full = np.correlate(ref_i, test_i, mode="full")
mid = len(ref_i) - 1
cc = cc_full[mid - max_lag_samples:mid + max_lag_samples + 1]
# Negate lags so positive = test delayed
lags = -np.arange(-max_lag_samples, max_lag_samples + 1) / grid_fs
norm = np.sqrt(np.sum(ref_i**2) * np.sum(test_i**2))
cc = cc / norm if norm > 0 else cc
peak_lag, peak_cc = parabolic_peak(cc, lags)
return {"lags_s": lags, "cc": cc, "peak_lag_s": peak_lag, "peak_cc": peak_cc,
"grid_fs": grid_fs}
# ---------------------------------------------------------------------------
# Plotting
# ---------------------------------------------------------------------------
def topic_info(time_s, name, field, role):
"""Return dict of topic metadata for the methodology table."""
n = len(time_s)
dur = time_s[-1] - time_s[0] if n > 1 else 0
dt = np.median(np.diff(time_s)) if n > 1 else 0
rate = 1.0 / dt if dt > 0 else 0
return {"topic": name, "field": field.strip(), "role": role,
"samples": n, "rate_hz": rate, "duration_s": dur}
def format_topic_table(infos):
"""Format a list of topic_info dicts into a fixed-width table string."""
hdr = (f" {'Topic':<25s} {'Field':<22s} {'Role':<10s}"
f" {'Samples':>7s} {'Rate':>8s} {'Duration':>8s}")
sep = " " + "-" * len(hdr.strip())
rows = [hdr, sep]
for t in infos:
rows.append(
f" {t['topic']:<25s} {t['field']:<22s} {t['role']:<10s}"
f" {t['samples']:>7d} {t['rate_hz']:>7.1f}Hz {t['duration_s']:>7.1f}s"
)
return "\n".join(rows)
def make_report(log_path, flow_gyro, fc_gyro, xcorr_x, xcorr_y, output_dir,
topic_lines):
log_name = os.path.splitext(os.path.basename(log_path))[0]
pdf_path = os.path.join(output_dir, f"{log_name}_flow_latency.pdf")
with PdfPages(pdf_path) as pdf:
# --- Page 1: Raw gyro overlay ---
fig, axes = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
fig.suptitle(f"Flow Sensor Latency — {log_name}", fontsize=13)
for ax, axis_label, flow_key, fc_key in [
(axes[0], "X (roll)", "rate_x", "rate_x"),
(axes[1], "Y (pitch)", "rate_y", "rate_y"),
]:
ax.plot(fc_gyro["time_s"], fc_gyro[fc_key], "b-", lw=0.4,
alpha=0.6, label="FC gyro")
ax.plot(flow_gyro["time_s"], flow_gyro[flow_key], "r-", lw=0.4,
alpha=0.6, label="flow gyro")
ax.set_ylabel(f"{axis_label} rate [rad/s]")
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
axes[1].set_xlabel("Time [s]")
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 2: Cross-correlation ---
fig, axes = plt.subplots(2, 1, figsize=(12, 7))
fig.suptitle("Cross-Correlation: flow gyro vs FC gyro", fontsize=13)
for ax, xc, label in [
(axes[0], xcorr_x, "X axis (roll)"),
(axes[1], xcorr_y, "Y axis (pitch)"),
]:
lags_ms = xc["lags_s"] * 1000
ax.plot(lags_ms, xc["cc"], "b-", lw=1.0)
peak_ms = xc["peak_lag_s"] * 1000
ax.axvline(peak_ms, color="r", ls="--", lw=1.5,
label=f"peak = {peak_ms:.1f} ms (r = {xc['peak_cc']:.3f})")
ax.axvline(0, color="gray", ls=":", lw=0.8)
ax.set_xlabel("Lag [ms] (positive = flow lags FC)")
ax.set_ylabel("Normalized correlation")
ax.set_title(label)
ax.legend(fontsize=10)
ax.grid(True, alpha=0.3)
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 3: Aligned overlay ---
avg_delay = 0.5 * (xcorr_x["peak_lag_s"] + xcorr_y["peak_lag_s"])
fig, axes = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
fig.suptitle(f"After delay correction ({avg_delay*1000:.1f} ms)", fontsize=13)
t_start = max(flow_gyro["time_s"][0], fc_gyro["time_s"][0])
t_end = min(flow_gyro["time_s"][-1], fc_gyro["time_s"][-1])
t_grid = np.arange(t_start, t_end, 1.0 / 1000.0)
for ax, axis_label, flow_key, fc_key in [
(axes[0], "X (roll)", "rate_x", "rate_x"),
(axes[1], "Y (pitch)", "rate_y", "rate_y"),
]:
fc_i = np.interp(t_grid, fc_gyro["time_s"], fc_gyro[fc_key])
flow_i = np.interp(t_grid, flow_gyro["time_s"] - avg_delay,
flow_gyro[flow_key])
ax.plot(t_grid, fc_i, "b-", lw=0.5, alpha=0.6, label="FC gyro")
ax.plot(t_grid, flow_i, "r-", lw=0.5, alpha=0.6,
label=f"flow gyro (shifted {avg_delay*1000:.1f} ms)")
ax.set_ylabel(f"{axis_label} rate [rad/s]")
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
axes[1].set_xlabel("Time [s]")
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 4: Methodology ---
delay_x_ms = xcorr_x["peak_lag_s"] * 1000
delay_y_ms = xcorr_y["peak_lag_s"] * 1000
grid_hz = xcorr_x["grid_fs"]
resolution_ms = 1000.0 / grid_hz
topic_table = format_topic_table(topic_lines)
fig = plt.figure(figsize=(12, 9))
fig.text(0.05, 0.95, "Methodology", fontsize=14, fontweight="bold",
verticalalignment="top")
text = (
"This script estimates the total pipeline delay between the flow sensor\n"
"measurement and its FC-side timestamp.\n"
"\n"
f"{topic_table}\n"
"\n"
f" Cross-correlation grid: {grid_hz:.0f} Hz (resolution: {resolution_ms:.1f} ms)\n"
"\n"
f" Result: X (roll): {delay_x_ms:+.1f} ± {resolution_ms/2:.1f} ms "
f"(r = {xcorr_x['peak_cc']:.3f})\n"
f" Y (pitch): {delay_y_ms:+.1f} ± {resolution_ms/2:.1f} ms "
f"(r = {xcorr_y['peak_cc']:.3f})\n"
f" Average: {avg_delay*1000:+.1f} ± {resolution_ms/2:.1f} ms\n"
"\n"
"Method:\n"
" 1. Extract the flow sensor's onboard gyro (test signal):\n"
" - Read sensor_optical_flow.delta_angle[0,1] (integrated gyro, radians).\n"
" - Divide by integration_timespan_us to get average angular rate (rad/s).\n"
" - The flow sensor's IMU (e.g. ICM42688P on ARK Flow) measures the same\n"
" rotation as the FC gyro, but its timestamp includes CAN transport delay.\n"
"\n"
" 2. Extract the FC's own gyro (reference signal, no CAN delay):\n"
" - Read sensor_combined.gyro_rad[0,1] (instantaneous rate, rad/s).\n"
" - This is the FC's directly-connected IMU.\n"
"\n"
" 3. Cross-correlate each axis independently:\n"
" - Interpolate both signals onto a common 1000 Hz grid.\n"
" - High-pass filter at 0.5 Hz (zero-phase) to remove bias.\n"
" - Compute normalized cross-correlation.\n"
" - Parabolic interpolation around the peak gives sub-sample resolution.\n"
" Positive lag = flow sensor timestamp is late (delayed).\n"
" Accuracy is ± half a grid sample with parabolic interpolation.\n"
)
fig.text(0.05, 0.88, text, fontsize=9.5, fontfamily="monospace",
verticalalignment="top")
pdf.savefig(fig)
plt.close(fig)
return pdf_path
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Flow sensor CAN latency analysis")
parser.add_argument("log", help="ULog file (.ulg)")
parser.add_argument("--output-dir", help="Output directory (default: logs/<name>/)")
args = parser.parse_args()
log_path = os.path.abspath(args.log)
if not os.path.isfile(log_path):
print(f"Error: {log_path} not found", file=sys.stderr)
sys.exit(1)
if args.output_dir:
output_dir = args.output_dir
else:
log_name = os.path.splitext(os.path.basename(log_path))[0]
script_dir = os.path.dirname(os.path.abspath(__file__))
px4_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
output_dir = os.path.join(px4_root, "logs", log_name)
os.makedirs(output_dir, exist_ok=True)
print(f"Loading {log_path} ...")
ulog = ULog(log_path)
flow_gyro = extract_flow_gyro(ulog)
if flow_gyro is None or len(flow_gyro["time_s"]) < 50:
print("Error: insufficient sensor_optical_flow data", file=sys.stderr)
sys.exit(1)
fc_gyro = extract_fc_gyro(ulog)
if fc_gyro is None or len(fc_gyro["time_s"]) < 50:
print("Error: insufficient sensor_combined data", file=sys.stderr)
sys.exit(1)
print(f"Flow gyro: {len(flow_gyro['time_s'])} samples, {flow_gyro['fs']:.0f} Hz")
print(f"FC gyro: {len(fc_gyro['time_s'])} samples, {fc_gyro['fs']:.0f} Hz")
# Cross-correlate each axis independently
xcorr_x = cross_correlate_delay(
fc_gyro["time_s"], fc_gyro["rate_x"],
flow_gyro["time_s"], flow_gyro["rate_x"],
max_lag_s=0.1, grid_fs=1000.0,
)
xcorr_y = cross_correlate_delay(
fc_gyro["time_s"], fc_gyro["rate_y"],
flow_gyro["time_s"], flow_gyro["rate_y"],
max_lag_s=0.1, grid_fs=1000.0,
)
if xcorr_x is None or xcorr_y is None:
print("Error: cross-correlation failed (insufficient overlap)", file=sys.stderr)
sys.exit(1)
avg_delay = 0.5 * (xcorr_x["peak_lag_s"] + xcorr_y["peak_lag_s"])
print(f"\n{'='*50}")
print(f"Estimated flow sensor pipeline delay:")
print(f" X axis (roll): {xcorr_x['peak_lag_s']*1000:+.1f} ms "
f"(r = {xcorr_x['peak_cc']:.3f})")
print(f" Y axis (pitch): {xcorr_y['peak_lag_s']*1000:+.1f} ms "
f"(r = {xcorr_y['peak_cc']:.3f})")
print(f" Average: {avg_delay*1000:+.1f} ms")
print(f"{'='*50}")
topic_lines = [
topic_info(flow_gyro["time_s"], "sensor_optical_flow", "delta_angle[0,1]", "test"),
topic_info(fc_gyro["time_s"], "sensor_combined", "gyro_rad[0,1]", "reference"),
]
pdf_path = make_report(log_path, flow_gyro, fc_gyro, xcorr_x, xcorr_y, output_dir,
topic_lines)
print(f"\nReport: {pdf_path}")
return pdf_path
if __name__ == "__main__":
main()

View File

@ -0,0 +1,525 @@
#!/usr/bin/env python3
"""
Range sensor CAN transport latency analysis.
Estimates the total pipeline delay from AFBRS50 measurement to the FC-side
distance_sensor timestamp by cross-correlating range rate with IMU-derived
vertical velocity. The lag at peak correlation is the transport delay.
Method:
1. Rotate body-frame accel to NED using attitude quaternion, subtract gravity
2. High-pass filter and integrate to get IMU vertical velocity (drift-free)
3. Differentiate distance_sensor to get range rate
4. Cross-correlate range rate with -vz_imu (sign: range decreases when vz>0)
5. Parabolic interpolation around peak for sub-sample resolution
Usage:
python3 range_sensor_latency.py <log.ulg> [--output-dir <dir>]
"""
import argparse
import os
import sys
import numpy as np
from scipy import signal as sig
try:
from pyulog import ULog
except ImportError:
print("Error: pyulog not installed. Run: pip install pyulog", file=sys.stderr)
sys.exit(1)
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
except ImportError:
print("Error: matplotlib not installed. Run: pip install matplotlib", file=sys.stderr)
sys.exit(1)
GRAVITY = 9.80665
# ---------------------------------------------------------------------------
# ULog helpers
# ---------------------------------------------------------------------------
def get_topic(ulog, name, multi_id=0):
for d in ulog.data_list:
if d.name == name and d.multi_id == multi_id:
return d
return None
def us_to_s(ts_us, start_us):
return (ts_us.astype(np.int64) - np.int64(start_us)) / 1e6
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def butter_highpass(cutoff_hz, fs, order=2):
nyq = fs / 2.0
return sig.butter(order, cutoff_hz / nyq, btype="high")
def butter_lowpass(cutoff_hz, fs, order=2):
nyq = fs / 2.0
return sig.butter(order, cutoff_hz / nyq, btype="low")
def quat_rotate_z(q0, q1, q2, q3, ax, ay, az):
"""Rotate body-frame vector to NED and return only the Z (down) component."""
return (2.0 * (q1 * q3 - q0 * q2) * ax
+ 2.0 * (q2 * q3 + q0 * q1) * ay
+ (q0**2 - q1**2 - q2**2 + q3**2) * az)
def parabolic_peak(cc, lags_s):
"""Refine cross-correlation peak with parabolic interpolation."""
idx = np.argmax(np.abs(cc))
if idx == 0 or idx == len(cc) - 1:
return lags_s[idx], cc[idx]
y0, y1, y2 = np.abs(cc[idx - 1]), np.abs(cc[idx]), np.abs(cc[idx + 1])
dt = lags_s[1] - lags_s[0]
# Parabolic interpolation: offset from idx in samples
offset = 0.5 * (y0 - y2) / (y0 - 2 * y1 + y2)
return lags_s[idx] + offset * dt, cc[idx]
# ---------------------------------------------------------------------------
# Data extraction
# ---------------------------------------------------------------------------
def extract_imu_vz(ulog):
"""
Build IMU-derived vertical velocity:
1. Body accel from sensor_combined (high rate)
2. Attitude quaternion from vehicle_attitude (interpolated)
3. Rotate to NED, subtract gravity kinematic az
4. HP-filter + integrate vz (drift-free)
"""
sc = get_topic(ulog, "sensor_combined")
att = get_topic(ulog, "vehicle_attitude")
if sc is None or att is None:
return None
t0 = ulog.start_timestamp
t_imu = us_to_s(sc.data["timestamp"], t0)
ax = sc.data["accelerometer_m_s2[0]"]
ay = sc.data["accelerometer_m_s2[1]"]
az = sc.data["accelerometer_m_s2[2]"]
t_att = us_to_s(att.data["timestamp_sample"], t0)
q0i = np.interp(t_imu, t_att, att.data["q[0]"])
q1i = np.interp(t_imu, t_att, att.data["q[1]"])
q2i = np.interp(t_imu, t_att, att.data["q[2]"])
q3i = np.interp(t_imu, t_att, att.data["q[3]"])
# NED vertical accel (kinematic = specific_force_ned + gravity)
az_ned = quat_rotate_z(q0i, q1i, q2i, q3i, ax, ay, az) + GRAVITY
fs_imu = 1.0 / np.median(np.diff(t_imu))
# HP filter to remove bias/drift, then integrate for velocity
b_hp, a_hp = butter_highpass(0.1, fs_imu, order=2)
az_hp = sig.filtfilt(b_hp, a_hp, az_ned)
dt = np.diff(t_imu, prepend=t_imu[0] - 1.0 / fs_imu)
vz_imu = np.cumsum(az_hp * dt)
# HP filter the integrated velocity too, to remove any remaining drift
vz_imu = sig.filtfilt(b_hp, a_hp, vz_imu)
return {"time_s": t_imu, "vz": vz_imu, "fs": fs_imu}
def extract_range(ulog):
"""Extract valid distance sensor measurements."""
d = get_topic(ulog, "distance_sensor")
if d is None:
return None
t0 = ulog.start_timestamp
t = us_to_s(d.data["timestamp"], t0)
dist = d.data["current_distance"]
quality = d.data["signal_quality"]
# Filter: valid quality and nonzero distance
valid = (quality > 0) & (dist > 0.01)
return {"time_s": t[valid], "distance": dist[valid]}
def extract_ekf_vz(ulog):
"""Extract EKF vertical velocity for comparison."""
d = get_topic(ulog, "vehicle_local_position")
if d is None:
return None
t0 = ulog.start_timestamp
return {
"time_s": us_to_s(d.data["timestamp"], t0),
"vz": d.data["vz"],
}
# ---------------------------------------------------------------------------
# Cross-correlation analysis
# ---------------------------------------------------------------------------
def compute_range_rate(range_data, lp_cutoff_hz=5.0):
"""Differentiate and low-pass filter range measurements."""
t = range_data["time_s"]
d = range_data["distance"]
if len(t) < 10:
return None
dt = np.diff(t)
rate = np.diff(d) / dt
t_rate = 0.5 * (t[:-1] + t[1:])
# Low-pass filter to reduce derivative noise
fs = 1.0 / np.median(dt)
if lp_cutoff_hz < fs / 2:
b_lp, a_lp = butter_lowpass(lp_cutoff_hz, fs, order=2)
rate = sig.filtfilt(b_lp, a_lp, rate)
return {"time_s": t_rate, "rate": rate, "fs": fs}
def cross_correlate_delay(ref_time, ref_signal, test_time, test_signal,
max_lag_s=0.1, grid_fs=None):
"""
Cross-correlate two signals on a common time grid.
Returns (lags_s, cc_normalized, peak_lag_s, peak_cc).
"""
# Common time window
t_start = max(ref_time[0], test_time[0])
t_end = min(ref_time[-1], test_time[-1])
if t_end - t_start < 1.0:
return None
if grid_fs is None:
grid_fs = 1.0 / min(np.median(np.diff(ref_time)),
np.median(np.diff(test_time)))
# Bump grid to at least 1000 Hz for reasonable resolution
grid_fs = max(grid_fs, 1000.0)
t_grid = np.arange(t_start, t_end, 1.0 / grid_fs)
ref_i = np.interp(t_grid, ref_time, ref_signal)
test_i = np.interp(t_grid, test_time, test_signal)
# Remove means
ref_i -= np.mean(ref_i)
test_i -= np.mean(test_i)
max_lag_samples = int(max_lag_s * grid_fs)
# np.correlate(a, b)[mid + k] peaks at k = -D when b is delayed by D.
# Negate the lag axis so positive = test delayed relative to ref.
cc_full = np.correlate(ref_i, test_i, mode="full")
mid = len(ref_i) - 1
cc = cc_full[mid - max_lag_samples:mid + max_lag_samples + 1]
lags = -np.arange(-max_lag_samples, max_lag_samples + 1) / grid_fs
# Normalize
norm = np.sqrt(np.sum(ref_i**2) * np.sum(test_i**2))
cc = cc / norm if norm > 0 else cc
peak_lag, peak_cc = parabolic_peak(cc, lags)
return {"lags_s": lags, "cc": cc, "peak_lag_s": peak_lag, "peak_cc": peak_cc,
"grid_fs": grid_fs}
# ---------------------------------------------------------------------------
# Plotting
# ---------------------------------------------------------------------------
def topic_info(time_s, name, field, role):
"""Return dict of topic metadata for the methodology table."""
n = len(time_s)
dur = time_s[-1] - time_s[0] if n > 1 else 0
dt = np.median(np.diff(time_s)) if n > 1 else 0
rate = 1.0 / dt if dt > 0 else 0
return {"topic": name, "field": field.strip(), "role": role,
"samples": n, "rate_hz": rate, "duration_s": dur}
def format_topic_table(infos):
"""Format a list of topic_info dicts into a fixed-width table string."""
hdr = (f" {'Topic':<25s} {'Field':<22s} {'Role':<10s}"
f" {'Samples':>7s} {'Rate':>8s} {'Duration':>8s}")
sep = " " + "-" * len(hdr.strip())
rows = [hdr, sep]
for t in infos:
rows.append(
f" {t['topic']:<25s} {t['field']:<22s} {t['role']:<10s}"
f" {t['samples']:>7d} {t['rate_hz']:>7.1f}Hz {t['duration_s']:>7.1f}s"
)
return "\n".join(rows)
def make_report(log_path, range_data, range_rate, imu_vz, ekf_vz, xcorr_imu,
xcorr_ekf, output_dir, topic_lines):
"""Generate PDF report."""
log_name = os.path.splitext(os.path.basename(log_path))[0]
pdf_path = os.path.join(output_dir, f"{log_name}_range_latency.pdf")
with PdfPages(pdf_path) as pdf:
# --- Page 1: Raw data overview ---
fig, axes = plt.subplots(3, 1, figsize=(12, 9), sharex=True)
fig.suptitle(f"Range Sensor Latency Analysis — {log_name}", fontsize=13)
axes[0].plot(range_data["time_s"], range_data["distance"], "b-", lw=0.6)
axes[0].set_ylabel("Range [m]")
axes[0].set_title("Distance sensor")
axes[0].grid(True, alpha=0.3)
axes[1].plot(range_rate["time_s"], range_rate["rate"], "r-", lw=0.6,
label="range rate")
axes[1].set_ylabel("Range rate [m/s]")
axes[1].set_title(f"Range rate (LP {5.0:.0f} Hz)")
axes[1].grid(True, alpha=0.3)
axes[2].plot(imu_vz["time_s"], -imu_vz["vz"], "g-", lw=0.4, alpha=0.7,
label="-vz_imu")
if ekf_vz is not None:
axes[2].plot(ekf_vz["time_s"], -ekf_vz["vz"], "k-", lw=0.8,
alpha=0.8, label="-vz_ekf")
axes[2].set_ylabel("Velocity [m/s]")
axes[2].set_xlabel("Time [s]")
axes[2].set_title("Vertical velocity (negated, for comparison with range rate)")
axes[2].legend(fontsize=8)
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 2: Cross-correlation results ---
n_plots = 1 + (1 if xcorr_ekf is not None else 0)
fig, axes = plt.subplots(n_plots, 1, figsize=(12, 4 * n_plots))
if n_plots == 1:
axes = [axes]
fig.suptitle("Cross-Correlation: range rate vs vertical velocity", fontsize=13)
for ax, xc, label in [
(axes[0], xcorr_imu, "IMU-derived vz"),
] + ([(axes[1], xcorr_ekf, "EKF vz")] if xcorr_ekf is not None else []):
lags_ms = xc["lags_s"] * 1000
ax.plot(lags_ms, xc["cc"], "b-", lw=1.0)
peak_ms = xc["peak_lag_s"] * 1000
ax.axvline(peak_ms, color="r", ls="--", lw=1.5,
label=f"peak = {peak_ms:.1f} ms (r = {xc['peak_cc']:.3f})")
ax.axvline(0, color="gray", ls=":", lw=0.8)
ax.set_xlabel("Lag [ms] (positive = range lags reference)")
ax.set_ylabel("Normalized correlation")
ax.set_title(f"Reference: {label} ({xc['grid_fs']:.0f} Hz grid)")
ax.legend(fontsize=10)
ax.grid(True, alpha=0.3)
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 3: Aligned overlay ---
fig, axes = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
fig.suptitle("Range rate vs -vz_imu: before and after delay correction",
fontsize=13)
# Common time window for overlay
t_start = max(range_rate["time_s"][0], imu_vz["time_s"][0])
t_end = min(range_rate["time_s"][-1], imu_vz["time_s"][-1])
t_grid = np.arange(t_start, t_end, 1.0 / 1000.0)
rr_i = np.interp(t_grid, range_rate["time_s"], range_rate["rate"])
vz_i = np.interp(t_grid, imu_vz["time_s"], -imu_vz["vz"])
# Normalize for visual comparison
rr_norm = (rr_i - np.mean(rr_i)) / (np.std(rr_i) + 1e-10)
vz_norm = (vz_i - np.mean(vz_i)) / (np.std(vz_i) + 1e-10)
axes[0].plot(t_grid, vz_norm, "g-", lw=0.6, alpha=0.7, label="-vz_imu")
axes[0].plot(t_grid, rr_norm, "r-", lw=0.6, alpha=0.7, label="range rate")
axes[0].set_title("Before correction (original timestamps)")
axes[0].set_ylabel("Normalized")
axes[0].legend(fontsize=8)
axes[0].grid(True, alpha=0.3)
# Shift range rate by estimated delay
delay = xcorr_imu["peak_lag_s"]
rr_shifted = np.interp(t_grid, range_rate["time_s"] - delay,
range_rate["rate"])
rr_s_norm = (rr_shifted - np.mean(rr_shifted)) / (np.std(rr_shifted) + 1e-10)
axes[1].plot(t_grid, vz_norm, "g-", lw=0.6, alpha=0.7, label="-vz_imu")
axes[1].plot(t_grid, rr_s_norm, "r-", lw=0.6, alpha=0.7,
label=f"range rate (shifted {delay*1000:.1f} ms)")
axes[1].set_title(f"After correction (delay = {delay*1000:.1f} ms)")
axes[1].set_xlabel("Time [s]")
axes[1].set_ylabel("Normalized")
axes[1].legend(fontsize=8)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
pdf.savefig(fig)
plt.close(fig)
# --- Page 4: Methodology ---
delay_ms = xcorr_imu["peak_lag_s"] * 1000
grid_hz = xcorr_imu["grid_fs"]
resolution_ms = 1000.0 / grid_hz
topic_table = format_topic_table(topic_lines)
fig = plt.figure(figsize=(12, 9))
fig.text(0.05, 0.95, "Methodology", fontsize=14, fontweight="bold",
verticalalignment="top")
text = (
"This script estimates the total pipeline delay between the range sensor\n"
"measurement and its FC-side timestamp.\n"
"\n"
f"{topic_table}\n"
"\n"
f" Cross-correlation grid: {grid_hz:.0f} Hz (resolution: {resolution_ms:.1f} ms)\n"
"\n"
f" Result: {delay_ms:+.1f} ± {resolution_ms/2:.1f} ms "
f"(r = {xcorr_imu['peak_cc']:.3f})\n"
"\n"
"Method:\n"
" 1. Build an IMU-derived vertical velocity (reference signal):\n"
" - Rotate sensor_combined accelerometer to NED using vehicle_attitude\n"
" quaternion, subtract gravity to get kinematic vertical acceleration.\n"
" - High-pass filter at 0.1 Hz (zero-phase) and integrate to get vz.\n"
" - High-pass filter the velocity to remove drift.\n"
" This signal has no CAN delay (FC IMU is directly connected).\n"
"\n"
" 2. Compute range rate (test signal):\n"
" - Differentiate distance_sensor.current_distance via finite differences.\n"
" - Low-pass filter at 5 Hz (zero-phase) to reduce derivative noise.\n"
" For a downward-facing sensor, range_rate ≈ -vz.\n"
"\n"
" 3. Cross-correlate:\n"
" - Interpolate both signals onto a common 1000 Hz grid.\n"
" - Remove means and compute normalized cross-correlation.\n"
" - The lag at peak correlation is the estimated delay.\n"
" - Parabolic interpolation around the peak gives sub-sample resolution.\n"
" Positive lag = range sensor timestamp is late (delayed).\n"
" Accuracy is ± half a grid sample with parabolic interpolation.\n"
)
fig.text(0.05, 0.88, text, fontsize=9.5, fontfamily="monospace",
verticalalignment="top")
pdf.savefig(fig)
plt.close(fig)
return pdf_path
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Range sensor CAN latency analysis")
parser.add_argument("log", help="ULog file (.ulg)")
parser.add_argument("--output-dir", help="Output directory (default: logs/<name>/)")
args = parser.parse_args()
log_path = os.path.abspath(args.log)
if not os.path.isfile(log_path):
print(f"Error: {log_path} not found", file=sys.stderr)
sys.exit(1)
# Output directory
if args.output_dir:
output_dir = args.output_dir
else:
log_name = os.path.splitext(os.path.basename(log_path))[0]
script_dir = os.path.dirname(os.path.abspath(__file__))
px4_root = os.path.abspath(os.path.join(script_dir, "..", ".."))
output_dir = os.path.join(px4_root, "logs", log_name)
os.makedirs(output_dir, exist_ok=True)
print(f"Loading {log_path} ...")
ulog = ULog(log_path)
# --- Extract data ---
range_data = extract_range(ulog)
if range_data is None or len(range_data["time_s"]) < 20:
print("Error: insufficient distance_sensor data", file=sys.stderr)
sys.exit(1)
range_rate = compute_range_rate(range_data, lp_cutoff_hz=5.0)
if range_rate is None:
print("Error: could not compute range rate", file=sys.stderr)
sys.exit(1)
imu_vz = extract_imu_vz(ulog)
if imu_vz is None:
print("Error: could not build IMU vertical velocity "
"(need sensor_combined + vehicle_attitude)", file=sys.stderr)
sys.exit(1)
ekf_vz = extract_ekf_vz(ulog)
# --- Cross-correlation ---
print(f"Distance sensor: {len(range_data['time_s'])} valid samples, "
f"{range_rate['fs']:.0f} Hz")
print(f"IMU accel: {len(imu_vz['time_s'])} samples, {imu_vz['fs']:.0f} Hz")
# range_rate ≈ -vz for downward-facing sensor (range decreases when descending)
# So correlate range_rate with -vz_imu
xcorr_imu = cross_correlate_delay(
imu_vz["time_s"], -imu_vz["vz"],
range_rate["time_s"], range_rate["rate"],
max_lag_s=0.1, grid_fs=1000.0,
)
if xcorr_imu is None:
print("Error: cross-correlation failed (insufficient overlap)", file=sys.stderr)
sys.exit(1)
xcorr_ekf = None
if ekf_vz is not None:
xcorr_ekf = cross_correlate_delay(
ekf_vz["time_s"], -ekf_vz["vz"],
range_rate["time_s"], range_rate["rate"],
max_lag_s=0.1, grid_fs=1000.0,
)
# --- Results ---
delay_ms = xcorr_imu["peak_lag_s"] * 1000
print(f"\n{'='*50}")
print(f"Estimated range sensor pipeline delay:")
print(f" IMU cross-correlation: {delay_ms:+.1f} ms "
f"(r = {xcorr_imu['peak_cc']:.3f})")
if xcorr_ekf is not None:
print(f" EKF cross-correlation: {xcorr_ekf['peak_lag_s']*1000:+.1f} ms "
f"(r = {xcorr_ekf['peak_cc']:.3f})")
print(f"{'='*50}")
if abs(delay_ms) < 1.0:
print("Note: delay < 1ms — may not be resolvable at this sample rate")
elif delay_ms < 0:
print("Note: negative delay suggests range leads IMU — "
"check sensor orientation / data quality")
# --- PDF report ---
# Build topic summary for methodology page
att = get_topic(ulog, "vehicle_attitude")
att_time = us_to_s(att.data["timestamp_sample"], ulog.start_timestamp) if att else np.array([])
topic_lines = [
topic_info(range_data["time_s"], "distance_sensor", "current_distance", "test"),
topic_info(imu_vz["time_s"], "sensor_combined", "accelerometer_m_s2", "reference"),
topic_info(att_time, "vehicle_attitude", "q[0..3]", "reference"),
]
if ekf_vz is not None:
topic_lines.append(
topic_info(ekf_vz["time_s"], "vehicle_local_position", "vz", "comparison"))
pdf_path = make_report(log_path, range_data, range_rate, imu_vz, ekf_vz,
xcorr_imu, xcorr_ekf, output_dir, topic_lines)
print(f"\nReport: {pdf_path}")
return pdf_path
if __name__ == "__main__":
main()